Skip to content

Commit 6531e0f

Browse files
(2.12) Atomic batch: return rejected batch entries to pool
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 3c72a62 commit 6531e0f

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

server/jetstream_cluster.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3016,8 +3016,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
30163016
}
30173017
if err = js.applyStreamMsgOp(mset, op, buf, isRecovering); err != nil {
30183018
mset.batchMu.Unlock()
3019-
// Make sure to return previous entries to the pool on error.
3020-
bce.ReturnToPool()
3019+
// Make sure to return remaining entries to the pool on an error.
3020+
for _, nce := range mset.batchEntries[j:] {
3021+
nce.ReturnToPool()
3022+
}
30213023
return 0, err
30223024
}
30233025
}
@@ -3259,6 +3261,10 @@ func (mset *stream) rejectBatchStateLocked() {
32593261
mset.clMu.Lock()
32603262
mset.clfs += mset.batchCount
32613263
mset.clMu.Unlock()
3264+
// We're rejecting the batch, so all entries need to be returned to the pool.
3265+
for _, bce := range mset.batchEntries {
3266+
bce.ReturnToPool()
3267+
}
32623268
mset.clearBatchStateLocked()
32633269
}
32643270

0 commit comments

Comments
 (0)