Skip to content

Commit e6b2320

Browse files
committed
update result array reference
1 parent 8c3832a commit e6b2320

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

pkg/ingester/kafka_consumer.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record
124124
// success keeps track of the records that were processed. It is expected to
125125
// be sorted in ascending order of offset since the records themselves are
126126
// ordered.
127-
success := make([]int64, len(records))
127+
success := make([]*int64, len(records))
128128

129129
for i := 0; i < numWorkers; i++ {
130130
wg.Add(1)
@@ -161,7 +161,8 @@ func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record
161161
continue
162162
}
163163

164-
success[recordWithIndex.index] = recordWithIndex.record.Offset
164+
offset := recordWithIndex.record.Offset
165+
success[recordWithIndex.index] = &offset
165166
}
166167
}()
167168
}
@@ -176,10 +177,10 @@ func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record
176177
// Find the highest offset before a gap, and commit that.
177178
var highestOffset int64
178179
for _, offset := range success {
179-
if offset == 0 {
180+
if offset == nil || *offset == 0 {
180181
break
181182
}
182-
highestOffset = offset
183+
highestOffset = *offset
183184
}
184185
if highestOffset > 0 {
185186
kc.committer.EnqueueOffset(highestOffset)

0 commit comments

Comments
 (0)