Skip to content

chore: parallel kafka consumer #17262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,11 @@ kafka_config:
# CLI flag: -kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]

# The maximum number of workers to use for consuming from Kafka. This is used
# to limit the number of concurrent requests to Kafka.
# CLI flag: -kafka.max-consumer-workers
[max_consumer_workers: <int> | default = 1]

# Enable collection of the following kafka latency histograms: read-wait,
# read-timing, write-wait, write-timing
# CLI flag: -kafka.enable-kafka-histograms
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
cfg.KafkaIngestion.KafkaConfig,
i.ingestPartitionID,
cfg.LifecyclerConfig.ID,
NewKafkaConsumerFactory(i, registerer),
NewKafkaConsumerFactory(i, registerer, cfg.KafkaIngestion.KafkaConfig.MaxConsumerWorkers),
logger,
registerer,
)
Expand Down
117 changes: 84 additions & 33 deletions pkg/ingester/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,31 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics {
}
}

func NewKafkaConsumerFactory(pusher logproto.PusherServer, reg prometheus.Registerer) partition.ConsumerFactory {
func NewKafkaConsumerFactory(pusher logproto.PusherServer, reg prometheus.Registerer, maxConsumerWorkers int) partition.ConsumerFactory {
metrics := newConsumerMetrics(reg)
return func(committer partition.Committer, logger log.Logger) (partition.Consumer, error) {
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
}
return &kafkaConsumer{
pusher: pusher,
logger: logger,
decoder: decoder,
metrics: metrics,
committer: committer,
pusher: pusher,
logger: logger,
decoder: decoder,
metrics: metrics,
committer: committer,
maxConsumerWorkers: maxConsumerWorkers,
}, nil
}
}

type kafkaConsumer struct {
pusher logproto.PusherServer
logger log.Logger
decoder *kafka.Decoder
committer partition.Committer

metrics *consumerMetrics
pusher logproto.PusherServer
logger log.Logger
decoder *kafka.Decoder
committer partition.Committer
maxConsumerWorkers int
metrics *consumerMetrics
}

func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() {
Expand Down Expand Up @@ -101,40 +102,90 @@ func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record
minOffset = int64(math.MaxInt64)
maxOffset = int64(0)
consumeStart = time.Now()
limitWorkers = kc.maxConsumerWorkers
wg sync.WaitGroup
)

// Find min/max offsets
for _, record := range records {
minOffset = min(minOffset, record.Offset)
maxOffset = max(maxOffset, record.Offset)
}

level.Debug(kc.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset)
for _, record := range records {
stream, err := kc.decoder.DecodeWithoutLabels(record.Content)
if err != nil {
level.Error(kc.logger).Log("msg", "failed to decode record", "error", err)
continue
}
recordCtx := user.InjectOrgID(record.Ctx, record.TenantID)
req := &logproto.PushRequest{
Streams: []logproto.Stream{stream},
}
if err := retryWithBackoff(ctx, func(attempts int) error {
pushTime := time.Now()
_, err := kc.pusher.Push(recordCtx, req)

kc.metrics.pushLatency.Observe(time.Since(pushTime).Seconds())
type recordWithIndex struct {
record partition.Record
index int
}

if err != nil {
level.Warn(kc.logger).Log("msg", "failed to push records", "err", err, "offset", record.Offset, "attempts", attempts)
return err
numWorkers := min(limitWorkers, len(records))
workChan := make(chan recordWithIndex, numWorkers)
// success keeps track of the records that were processed. It is expected to
// be sorted in ascending order of offset since the records themselves are
// ordered.
success := make([]*int64, len(records))

for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for recordWithIndex := range workChan {
stream, err := kc.decoder.DecodeWithoutLabels(recordWithIndex.record.Content)
if err != nil {
level.Error(kc.logger).Log("msg", "failed to decode record", "error", err)
continue
}

recordCtx := user.InjectOrgID(recordWithIndex.record.Ctx, recordWithIndex.record.TenantID)
req := &logproto.PushRequest{
Streams: []logproto.Stream{stream},
}

level.Debug(kc.logger).Log("msg", "pushing record", "offset", recordWithIndex.record.Offset, "length", len(recordWithIndex.record.Content))

if err := retryWithBackoff(ctx, func(attempts int) error {
pushTime := time.Now()
_, err := kc.pusher.Push(recordCtx, req)

kc.metrics.pushLatency.Observe(time.Since(pushTime).Seconds())

if err != nil {
level.Warn(kc.logger).Log("msg", "failed to push records", "err", err, "offset", recordWithIndex.record.Offset, "attempts", attempts)
return err
}

return nil
}); err != nil {
level.Error(kc.logger).Log("msg", "exhausted all retry attempts, failed to push records", "err", err, "offset", recordWithIndex.record.Offset)
continue
}

offset := recordWithIndex.record.Offset
success[recordWithIndex.index] = &offset
}
return nil
}); err != nil {
level.Error(kc.logger).Log("msg", "exhausted all retry attempts, failed to push records", "err", err, "offset", record.Offset)
}()
}

for i, record := range records {
workChan <- recordWithIndex{record: record, index: i}
}
close(workChan)

wg.Wait()

// Find the highest offset before a gap, and commit that.
var highestOffset int64
for _, offset := range success {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you might want to make success []*int to handle the case where offset == 0, which iirc is still valid for the first record in a partition.

if offset == nil || *offset == 0 {
break
}
kc.committer.EnqueueOffset(record.Offset)
highestOffset = *offset
}
if highestOffset > 0 {
kc.committer.EnqueueOffset(highestOffset)
}

kc.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds())
kc.metrics.currentOffset.Set(float64(maxOffset))
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/ingester/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ func (noopCommitter) Commit(_ context.Context, _ int64) error { return nil }

func TestConsumer(t *testing.T) {
var (
toPush []partition.Record
offset = int64(0)
pusher = &fakePusher{t: t}
toPush []partition.Record
offset = int64(0)
pusher = &fakePusher{t: t}
numWorkers = 1
)

consumer, err := NewKafkaConsumerFactory(pusher, prometheus.NewRegistry())(&noopCommitter{}, log.NewLogfmtLogger(os.Stdout))
// Set the number of workers to 1 to test the consumer
consumer, err := NewKafkaConsumerFactory(pusher, prometheus.NewRegistry(), numWorkers)(&noopCommitter{}, log.NewLogfmtLogger(os.Stdout))
require.NoError(t, err)

records, err := kafka.Encode(0, tenantID, streamBar, 10000)
Expand All @@ -100,7 +102,7 @@ func TestConsumer(t *testing.T) {
})
offset++
}
records, err = kafka.Encode(0, "foo", streamFoo, 10000)
records, err = kafka.Encode(0, tenantID, streamFoo, 10000)
require.NoError(t, err)
for _, record := range records {
toPush = append(toPush, partition.Record{
Expand All @@ -116,7 +118,8 @@ func TestConsumer(t *testing.T) {
recordChan := make(chan []partition.Record)
wait := consumer.Start(ctx, recordChan)

recordChan <- toPush
// Send records in separate batches
recordChan <- toPush // Send streamBar record

cancel()
wait()
Expand Down
8 changes: 8 additions & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"flag"
"fmt"
"runtime"
"time"

"github.com/grafana/dskit/flagext"
Expand Down Expand Up @@ -56,6 +57,7 @@ type Config struct {
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`

MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
MaxConsumerWorkers int `yaml:"max_consumer_workers"`

EnableKafkaHistograms bool `yaml:"enable_kafka_histograms"`
}
Expand Down Expand Up @@ -89,6 +91,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+consumerLagUsage)

f.BoolVar(&cfg.EnableKafkaHistograms, prefix+".enable-kafka-histograms", false, "Enable collection of the following kafka latency histograms: read-wait, read-timing, write-wait, write-timing")
f.IntVar(&cfg.MaxConsumerWorkers, prefix+".max-consumer-workers", 1, "The maximum number of workers to use for consuming from Kafka. This is used to limit the number of concurrent requests to Kafka.")

// If the number of workers is set to 0, use the number of available CPUs
if cfg.MaxConsumerWorkers == 0 {
cfg.MaxConsumerWorkers = runtime.GOMAXPROCS(0)
}
}

func (cfg *Config) Validate() error {
Expand Down
11 changes: 8 additions & 3 deletions pkg/kafka/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,16 @@ func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) {

// DecodeWithoutLabels converts a Kafka record's byte data back into a logproto.Stream without parsing labels.
func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) {
d.stream.Entries = d.stream.Entries[:0]
if err := d.stream.Unmarshal(data); err != nil {
if len(data) == 0 {
return logproto.Stream{}, errors.New("empty data received")
}

stream := logproto.Stream{}
if err := stream.Unmarshal(data); err != nil {
return logproto.Stream{}, fmt.Errorf("failed to unmarshal stream: %w", err)
}
return *d.stream, nil

return stream, nil
}

// sovPush calculates the size of varint-encoded uint64.
Expand Down
Loading