Skip to content

Commit 4273fc4

Browse files
kalbhorjoeirimpan
andauthored
refactor: ditch consumer groups for direct consumers (#35)
* refactor: replace consumer groups with direct consumers * feat: make target channel buffer configurable * fix: Fix stop at end flow * fix: Fix filters config name while parsing --------- Co-authored-by: Joe Paul <[email protected]>
1 parent dba537d commit 4273fc4

File tree

7 files changed

+92
-216
lines changed

7 files changed

+92
-216
lines changed

config.sample.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ source_topic2 = "target_topic2"
1515
[source_pool]
1616
# Kafka client config common to all upstream sources ([[sources]]).
1717
initial_offset = "start"
18-
# Static memmbership to pin the member for the consumer group for respawn / reconnect and fence other members from connecting using the same id.
19-
instance_id = "client_instance_id"
20-
# Consumer group id.
21-
group_id = "consumer_group"
2218

2319
# Frequency at which source servers are polled for health/lag.
2420
healthcheck_interval = "3s"
@@ -109,6 +105,7 @@ ca_cert_path = ""
109105
max_retries = -1
110106
flush_batch_size = 1000
111107
batch_size = 1000
108+
buffer_size = 100000 # channel buffer length
112109
max_message_bytes = 10000000
113110

114111
# Kafka exponential retry-backoff config for reconnection attempts.

init.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,12 @@ func initSourcePoolConfig(ko *koanf.Koanf) relay.SourcePoolCfg {
9292
EnableBackoff: ko.Bool("source_pool.backoff_enable"),
9393
BackoffMin: ko.MustDuration("source_pool.backoff_min"),
9494
BackoffMax: ko.MustDuration("source_pool.backoff_max"),
95-
GroupID: ko.MustString("source_pool.group_id"),
96-
InstanceID: ko.MustString("source_pool.instance_id"),
9795
}
9896
}
9997

10098
func initRelayConfig(ko *koanf.Koanf) relay.RelayCfg {
10199
return relay.RelayCfg{
102-
StopAtEnd: ko.Bool("stop_at_end"),
100+
StopAtEnd: ko.Bool("stop-at-end"),
103101
}
104102
}
105103

@@ -175,10 +173,10 @@ func initTopicsMap(ko *koanf.Koanf) relay.Topics {
175173
}
176174

177175
// initKafkaConfig reads the source(s)/target Kafka configuration.
178-
func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerCfg) {
176+
func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerCfg, relay.ProducerCfg) {
179177
// Read source Kafka config.
180178
src := struct {
181-
Sources []relay.ConsumerGroupCfg `koanf:"sources"`
179+
Sources []relay.ConsumerCfg `koanf:"sources"`
182180
}{}
183181

184182
if err := ko.Unmarshal("", &src); err != nil {
@@ -237,11 +235,11 @@ func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider,
237235
}
238236

239237
var cfg filter.Config
240-
if err := ko.Unmarshal("filter."+id, &cfg); err != nil {
238+
if err := ko.Unmarshal("filters."+id, &cfg); err != nil {
241239
log.Fatalf("error unmarshalling filter config: %s: %v", id, err)
242240
}
243241
if cfg.Config == "" {
244-
lo.Info(fmt.Sprintf("WARNING: No config 'filter.%s' for '%s' in config", id, id))
242+
lo.Info(fmt.Sprintf("WARNING: No config 'filters.%s' for '%s' in config", id, id))
245243
}
246244

247245
// Initialize the plugin.

internal/relay/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ type KafkaCfg struct {
4242
EnableLog bool `koanf:"enable_log"`
4343
}
4444

45-
// ConsumerGroupCfg is the consumer group specific config.
46-
type ConsumerGroupCfg struct {
45+
// ConsumerCfg is the direct consumer config.
46+
type ConsumerCfg struct {
4747
KafkaCfg `koanf:",squash"`
4848
}
4949

@@ -57,6 +57,7 @@ type ProducerCfg struct {
5757
FlushFrequency time.Duration `koanf:"flush_frequency"`
5858
MaxMessageBytes int `koanf:"max_message_bytes"`
5959
BatchSize int `koanf:"batch_size"`
60+
BufferSize int `koanf:"buffer_size"`
6061
FlushBatchSize int `koanf:"flush_batch_size"`
6162
Compression string `koanf:"compression"` // gzip|snappy|lz4|zstd|none
6263
}

internal/relay/relay.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
type RelayCfg struct {
15-
StopAtEnd bool `koanf:"stop_at_end"`
15+
StopAtEnd bool
1616
}
1717

1818
// Relay represents the input, output kafka and the remapping necessary to forward messages from one topic to another.
@@ -30,7 +30,7 @@ type Relay struct {
3030
// If stop-at-end is enabled, the "end" offsets of the source
3131
// read at the time of boot are cached here to compare against
3232
// live offsets and stop consumption.
33-
targetOffsets map[string]map[int32]kgo.Offset
33+
targetOffsets TopicOffsets
3434

3535
// Live topic offsets from source.
3636
srcOffsets map[string]map[int32]int64
@@ -42,7 +42,7 @@ type Relay struct {
4242
func NewRelay(cfg RelayCfg, src *SourcePool, target *Target, topics Topics, filters map[string]filter.Provider, log *slog.Logger) (*Relay, error) {
4343
// If stop-at-end is set, fetch and cache the offsets to determine
4444
// when end is reached.
45-
var offsets map[string]map[int32]kgo.Offset
45+
var offsets TopicOffsets
4646
if cfg.StopAtEnd {
4747
if o, err := target.GetHighWatermark(); err != nil {
4848
return nil, err
@@ -115,7 +115,7 @@ func (re *Relay) Start(globalCtx context.Context) error {
115115
go func() {
116116
defer wg.Done()
117117
// Wait till main ctx is cancelled.
118-
<-globalCtx.Done()
118+
<-ctx.Done()
119119

120120
// Stop consumer group.
121121
re.source.Close()
@@ -133,6 +133,7 @@ func (re *Relay) Start(globalCtx context.Context) error {
133133
// Close producer.
134134
re.target.Close()
135135

136+
cancel()
136137
wg.Wait()
137138

138139
return nil
@@ -223,7 +224,7 @@ loop:
223224
rec := iter.Next()
224225
// Always record the latest offsets before the messages are processed for new connections and
225226
// retries to consume from where it was left off.
226-
// NOTE: What if the next step fails? The messages won't be read again?
227+
// TODO: What if the next step fails? The messages won't be read again?
227228
re.source.RecordOffsets(rec)
228229

229230
if err := re.processMessage(ctx, rec); err != nil {

0 commit comments

Comments
 (0)