@@ -95,35 +95,69 @@ func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record
95
95
minOffset = int64 (math .MaxInt64 )
96
96
maxOffset = int64 (0 )
97
97
consumeStart = time .Now ()
98
+ limitWorkers = 100 // TODO(fcjack): make this configurable
99
+ wg sync.WaitGroup
100
+ mu sync.Mutex
101
+ lastOffset = int64 (0 )
98
102
)
99
103
104
+ // Find min/max offsets
100
105
for _ , record := range records {
101
106
minOffset = min (minOffset , record .Offset )
102
107
maxOffset = max (maxOffset , record .Offset )
103
108
}
104
109
105
- level .Debug (kc .logger ).Log ("msg" , "consuming records" , "min_offset" , minOffset , "max_offset" , maxOffset )
106
- for _ , record := range records {
107
- stream , err := kc .decoder .DecodeWithoutLabels (record .Content )
108
- if err != nil {
109
- level .Error (kc .logger ).Log ("msg" , "failed to decode record" , "error" , err )
110
- continue
111
- }
112
- recordCtx := user .InjectOrgID (record .Ctx , record .TenantID )
113
- req := & logproto.PushRequest {
114
- Streams : []logproto.Stream {stream },
115
- }
116
- if err := retryWithBackoff (ctx , func (attempts int ) error {
117
- if _ , err := kc .pusher .Push (recordCtx , req ); err != nil {
118
- level .Warn (kc .logger ).Log ("msg" , "failed to push records" , "err" , err , "offset" , record .Offset , "attempts" , attempts )
119
- return err
110
+ // Create a buffered channel for work distribution
111
+ numWorkers := min (limitWorkers , len (records )) // Calculate number of workers based on limit and number of records
112
+ workChan := make (chan partition.Record , len (records )) // Channel to distribute work
113
+ level .Debug (kc .logger ).Log ("msg" , "consuming records" , "min_offset" , minOffset , "max_offset" , maxOffset , "workers" , numWorkers )
114
+
115
+ // Start worker pool
116
+ for i := 0 ; i < numWorkers ; i ++ {
117
+ wg .Add (1 )
118
+ go func () {
119
+ defer wg .Done ()
120
+ for record := range workChan {
121
+ stream , _ , err := kc .decoder .Decode (record .Content )
122
+ if err != nil {
123
+ level .Error (kc .logger ).Log ("msg" , "failed to decode record" , "error" , err )
124
+ continue
125
+ }
126
+
127
+ recordCtx := user .InjectOrgID (record .Ctx , record .TenantID )
128
+ req := & logproto.PushRequest {
129
+ Streams : []logproto.Stream {stream },
130
+ }
131
+
132
+ if err := retryWithBackoff (ctx , func (attempts int ) error {
133
+ if _ , err := kc .pusher .Push (recordCtx , req ); err != nil {
134
+ level .Warn (kc .logger ).Log ("msg" , "failed to push records" , "err" , err , "offset" , record .Offset , "attempts" , attempts )
135
+ return err
136
+ }
137
+ return nil
138
+ }); err != nil {
139
+ level .Error (kc .logger ).Log ("msg" , "exhausted all retry attempts, failed to push records" , "err" , err , "offset" , record .Offset )
140
+ continue
141
+ }
142
+
143
+ mu .Lock ()
144
+ if record .Offset > lastOffset {
145
+ lastOffset = record .Offset
146
+ kc .committer .EnqueueOffset (lastOffset )
147
+ }
148
+ mu .Unlock ()
120
149
}
121
- return nil
122
- }); err != nil {
123
- level .Error (kc .logger ).Log ("msg" , "exhausted all retry attempts, failed to push records" , "err" , err , "offset" , record .Offset )
124
- }
125
- kc .committer .EnqueueOffset (record .Offset )
150
+ }()
126
151
}
152
+
153
+ // Distribute work to workers
154
+ for _ , record := range records {
155
+ workChan <- record
156
+ }
157
+ close (workChan )
158
+
159
+ wg .Wait ()
160
+
127
161
kc .metrics .consumeLatency .Observe (time .Since (consumeStart ).Seconds ())
128
162
kc .metrics .currentOffset .Set (float64 (maxOffset ))
129
163
}
0 commit comments