Skip to content

Commit 69b4735

Browse files
Batch apply state cleanup
Signed-off-by: Maurice van Veen <[email protected]>
1 parent d85ff64 commit 69b4735

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

server/jetstream_batching_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,17 +336,21 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) {
336336
require_NoError(t, err)
337337
mset.mu.RLock()
338338
batches := mset.batches
339+
batch := mset.batchApply
339340
mset.mu.RUnlock()
340341
require_True(t, batches == nil)
342+
require_True(t, batch == nil)
341343

342344
// Enabling doesn't need to populate the batching state.
343345
cfg.AllowAtomicPublish = true
344346
_, err = jsStreamUpdate(t, nc, cfg)
345347
require_NoError(t, err)
346348
mset.mu.RLock()
347349
batches = mset.batches
350+
batch = mset.batchApply
348351
mset.mu.RUnlock()
349352
require_True(t, batches == nil)
353+
require_True(t, batch == nil)
350354

351355
// Publish a partial batch that needs to be cleaned up.
352356
m := nats.NewMsg("foo")
@@ -362,8 +366,10 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) {
362366

363367
mset.mu.RLock()
364368
batches = mset.batches
369+
batch = mset.batchApply
365370
mset.mu.RUnlock()
366371
require_NotNil(t, batches)
372+
require_NotNil(t, batch)
367373
batches.mu.Lock()
368374
groups := len(batches.group)
369375
b := batches.group["uuid"]
@@ -372,6 +378,7 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) {
372378
require_NotNil(t, b)
373379
store := b.store
374380
require_Equal(t, store.State().Msgs, 1)
381+
clfs := mset.getCLFS()
375382

376383
// Should fully clean up the in-progress batch.
377384
switch mode {
@@ -396,6 +403,15 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) {
396403
}
397404
return nil
398405
})
406+
// Should clean up the batch apply state.
407+
if mode == Disable || mode == Delete {
408+
mset.mu.RLock()
409+
batch = mset.batchApply
410+
mset.mu.RUnlock()
411+
nclfs := mset.getCLFS()
412+
require_True(t, batch == nil)
413+
require_Equal(t, clfs, nclfs)
414+
}
399415
}
400416

401417
t.Run("Disable", func(t *testing.T) { test(t, Disable) })

server/stream.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2228,6 +2228,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool,
22282228
// If atomic publish is disabled, delete any in-progress batches.
22292229
if !cfg.AllowAtomicPublish {
22302230
mset.deleteInflightBatches()
2231+
mset.deleteBatchApplyState()
22312232
}
22322233

22332234
// Now update config and store's version of our config.
@@ -4161,8 +4162,10 @@ func (mset *stream) unsubscribeToStream(stopping bool) error {
41614162
// Clear batching state.
41624163
mset.deleteInflightBatches()
41634164

4164-
// In case we had a direct get subscriptions.
41654165
if stopping {
4166+
mset.deleteBatchApplyState()
4167+
4168+
// In case we had a direct get subscriptions.
41664169
mset.unsubscribeToDirect()
41674170
}
41684171

@@ -4182,6 +4185,17 @@ func (mset *stream) deleteInflightBatches() {
41824185
}
41834186
}
41844187

4188+
// Lock should be held.
4189+
func (mset *stream) deleteBatchApplyState() {
4190+
if batch := mset.batchApply; batch != nil {
4191+
// Need to return entries (if any) to the pool.
4192+
for _, bce := range batch.entries {
4193+
bce.ReturnToPool()
4194+
}
4195+
mset.batchApply = nil
4196+
}
4197+
}
4198+
41854199
// Lock does NOT need to be held, we set the client on setup and never change it at this point.
41864200
func (mset *stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
41874201
if mset.closed.Load() {

0 commit comments

Comments
 (0)