From a1ee0a9099e2d8fd9df9e61a34a5dbd758df3569 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 15 Jul 2025 10:11:24 +0200 Subject: [PATCH] (2.12) Atomic batch: support large batches Signed-off-by: Maurice van Veen --- locksordering.txt | 6 +- server/jetstream_batching.go | 39 +++ server/jetstream_batching_test.go | 446 +++++++++++++++++++++++++ server/jetstream_cluster.go | 521 +++++++++++++++++++++--------- server/raft.go | 50 ++- server/raft_test.go | 68 +++- server/stream.go | 62 +++- 7 files changed, 1016 insertions(+), 176 deletions(-) diff --git a/locksordering.txt b/locksordering.txt index 530a8e4ae6..d15ff82be6 100644 --- a/locksordering.txt +++ b/locksordering.txt @@ -35,10 +35,12 @@ The "clsMu" lock protects the consumer list on a stream, used for signalling con stream -> clsMu -The "clMu" and "ddMu" locks protect clustered and dedupe state respectively. The stream lock (`mset.mu`) is optional, -but if holding "clMu" or "ddMu", locking the stream lock afterward would violate locking order. +The "clMu", "ddMu" and "batchMu" locks protect clustered, dedupe and batch state respectively. +The stream lock (`mset.mu`) is optional, but if holding "clMu", "ddMu" or "batchMu", +locking the stream lock afterward would violate locking order. stream -> clMu + stream -> batchMu -> clMu stream -> ddMu The "mset.batches.mu" lock protects the batching state without needing to hold the stream lock. diff --git a/server/jetstream_batching.go b/server/jetstream_batching.go index 2c40ff27ae..f4b3b4f5f6 100644 --- a/server/jetstream_batching.go +++ b/server/jetstream_batching.go @@ -193,6 +193,45 @@ func (diff *batchStagedDiff) commit(mset *stream) { } } +type batchApply struct { + mu sync.Mutex + id string // ID of the current batch. + count uint64 // Number of entries in the batch, for consistency checks. + entries []*CommittedEntry // Previous entries that are part of this batch. + entryStart int // The index into an entry indicating the first message of the batch. + maxApplied uint64 // Applied value before the entry containing the first message of the batch. +} + +// clearBatchStateLocked clears in-memory apply-batch-related state. +// batch.mu lock should be held. +func (batch *batchApply) clearBatchStateLocked() { + batch.id = _EMPTY_ + batch.count = 0 + batch.entries = nil + batch.entryStart = 0 + batch.maxApplied = 0 +} + +// rejectBatchStateLocked rejects the batch and clears in-memory apply-batch-related state. +// Corrects mset.clfs to take the failed batch into account. +// batch.mu lock should be held. +func (batch *batchApply) rejectBatchStateLocked(mset *stream) { + mset.clMu.Lock() + mset.clfs += batch.count + mset.clMu.Unlock() + // We're rejecting the batch, so all entries need to be returned to the pool. + for _, bce := range batch.entries { + bce.ReturnToPool() + } + batch.clearBatchStateLocked() +} + +func (batch *batchApply) rejectBatchState(mset *stream) { + batch.mu.Lock() + defer batch.mu.Unlock() + batch.rejectBatchStateLocked(mset) +} + // checkMsgHeadersPreClusteredProposal checks the message for expected/consistency headers. // mset.mu lock must NOT be held or used. // mset.clMu lock must be held. diff --git a/server/jetstream_batching_test.go b/server/jetstream_batching_test.go index 65614c444e..6d5b9c8c19 100644 --- a/server/jetstream_batching_test.go +++ b/server/jetstream_batching_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/klauspost/compress/s2" "github.com/nats-io/nats.go" ) @@ -490,8 +491,10 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) { require_NoError(t, err) mset.mu.RLock() batches := mset.batches + batch := mset.batchApply mset.mu.RUnlock() require_True(t, batches == nil) + require_True(t, batch == nil) // Enabling doesn't need to populate the batching state. cfg.AllowAtomicPublish = true @@ -499,8 +502,10 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) { require_NoError(t, err) mset.mu.RLock() batches = mset.batches + batch = mset.batchApply mset.mu.RUnlock() require_True(t, batches == nil) + require_True(t, batch == nil) // Publish a partial batch that needs to be cleaned up. m := nats.NewMsg("foo") @@ -516,8 +521,10 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) { mset.mu.RLock() batches = mset.batches + batch = mset.batchApply mset.mu.RUnlock() require_NotNil(t, batches) + require_NotNil(t, batch) batches.mu.Lock() groups := len(batches.group) b := batches.group["uuid"] @@ -526,6 +533,7 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) { require_NotNil(t, b) store := b.store require_Equal(t, store.State().Msgs, 1) + clfs := mset.getCLFS() // Should fully clean up the in-progress batch. switch mode { @@ -565,6 +573,15 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) { } return nil }) + // Should clean up the batch apply state. + if mode == Disable || mode == Delete { + mset.mu.RLock() + batch = mset.batchApply + mset.mu.RUnlock() + nclfs := mset.getCLFS() + require_True(t, batch == nil) + require_Equal(t, clfs, nclfs) + } } t.Run("Disable", func(t *testing.T) { test(t, Disable) }) @@ -1272,3 +1289,432 @@ func TestJetStreamAtomicBatchPublishSingleServerRecovery(t *testing.T) { require_Equal(t, state.FirstSeq, 1) require_Equal(t, state.LastSeq, 2) } + +func TestJetStreamAtomicBatchPublishEncode(t *testing.T) { + test := func(t *testing.T, commit bool, compress bool) { + ts := time.Now().UnixNano() + hdr := genHeader(nil, "Nats-Batch-Id", "uuid") + hdr = genHeader(hdr, "Nats-Batch-Sequence", "1") + msg := []byte("A") + if compress { + msg = bytes.Repeat(msg, compressThreshold) + } + esm := encodeStreamMsgAllowCompressAndBatch("foo", "reply", hdr, msg, 1, ts, false, "uuid", 1, commit) + + buf, op := esm[1:], entryOp(esm[0]) + if commit { + require_Equal(t, op, batchCommitMsgOp) + } else { + require_Equal(t, op, batchMsgOp) + } + + batchId, batchSeq, op, mbuf, err := decodeBatchMsg(buf) + require_NoError(t, err) + require_Equal(t, batchId, "uuid") + require_Equal(t, batchSeq, 1) + if compress { + require_Equal(t, op, compressedStreamMsgOp) + mbuf, err = s2.Decode(nil, mbuf) + require_NoError(t, err) + } else { + require_Equal(t, op, streamMsgOp) + } + + subject, reply, dhdr, dmsg, lseq, dts, sourced, err := decodeStreamMsg(mbuf) + require_NoError(t, err) + require_Equal(t, subject, "foo") + require_Equal(t, reply, "reply") + require_True(t, bytes.Equal(dhdr, hdr)) + require_True(t, bytes.Equal(dmsg, msg)) + require_Equal(t, lseq, 1) + require_Equal(t, dts, ts) + require_False(t, sourced) + } + + t.Run("normal", func(t *testing.T) { test(t, false, false) }) + t.Run("normal-compress", func(t *testing.T) { test(t, false, true) }) + t.Run("commit", func(t *testing.T) { test(t, true, false) }) + t.Run("commit-compress", func(t *testing.T) { test(t, true, true) }) +} + +// Test a batch within a single proposal, optionally combined with messages unrelated +// to the batch but within the same proposal. +func TestJetStreamAtomicBatchPublishProposeOne(t *testing.T) { + test := func(t *testing.T, combined bool) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: FileStorage, + Replicas: 3, + AllowAtomicPublish: true, + }) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + var entries []*Entry + + mset.clMu.Lock() + if combined { + esm := encodeStreamMsg("foo", _EMPTY_, nil, nil, mset.clseq, 0, false) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + } + + msg := []byte("hello") + hdr := genHeader(nil, "Nats-Batch-Id", "uuid") + hdr = setHeader("Nats-Batch-Sequence", "1", hdr) + esm := encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, msg, mset.clseq, 0, false, "uuid", 1, false) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + + hdr = setHeader("Nats-Batch-Sequence", "2", hdr) + hdr = setHeader("Nats-Batch-Commit", "1", hdr) + esm = encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, msg, mset.clseq, 0, false, "uuid", 2, true) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + + if combined { + esm = encodeStreamMsg("foo", _EMPTY_, nil, nil, mset.clseq, 0, false) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + } + mset.clMu.Unlock() + n := mset.raftNode().(*raft) + n.sendAppendEntry(entries) + + pubAck, err = js.Publish("foo", nil) + require_NoError(t, err) + if combined { + require_Equal(t, pubAck.Sequence, 6) + } else { + require_Equal(t, pubAck.Sequence, 4) + } + } + + t.Run("single", func(t *testing.T) { test(t, false) }) + t.Run("combined", func(t *testing.T) { test(t, true) }) +} + +// Test a batch spanning multiple proposals, optionally combined with messages unrelated +// to the batch but within the same first/last proposal. +func TestJetStreamAtomicBatchPublishProposeMultiple(t *testing.T) { + test := func(t *testing.T, partial bool, combined bool) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: FileStorage, + Replicas: 3, + AllowAtomicPublish: true, + }) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + var entries []*Entry + mset.clMu.Lock() + hdr := genHeader(nil, "Nats-Batch-Id", "uuid") + hdr = genHeader(hdr, "Nats-Batch-Sequence", "1") + msg := []byte("hello") + if combined { + esm := encodeStreamMsg("foo", _EMPTY_, nil, nil, mset.clseq, 0, false) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + } + esm := encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, msg, mset.clseq, 0, false, "uuid", 1, false) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + mset.clMu.Unlock() + n := mset.raftNode().(*raft) + n.sendAppendEntry(entries) + + mset.clMu.Lock() + hdr = setHeader("Nats-Batch-Sequence", "2", hdr) + esm = encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, msg, mset.clseq, 0, false, "uuid", 2, false) + mset.clseq++ + mset.clMu.Unlock() + n.sendAppendEntry([]*Entry{newEntry(EntryNormal, esm)}) + + entries = nil + mset.clMu.Lock() + if !partial { + hdr = setHeader("Nats-Batch-Sequence", "3", hdr) + hdr = genHeader(hdr, "Nats-Batch-Commit", "1") + esm = encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, msg, mset.clseq, 0, false, "uuid", 3, true) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + } + if combined { + esm = encodeStreamMsg("foo", _EMPTY_, nil, nil, mset.clseq, 0, false) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + } + mset.clMu.Unlock() + n.sendAppendEntry(entries) + + pubAck, err = js.Publish("foo", nil) + require_NoError(t, err) + expectedSeq := uint64(2) + if !partial { + expectedSeq += 3 + } + if combined { + expectedSeq += 2 + } + require_Equal(t, pubAck.Sequence, expectedSeq) + } + + t.Run("partial", func(t *testing.T) { test(t, true, false) }) + t.Run("partial-combined", func(t *testing.T) { test(t, true, true) }) + t.Run("full", func(t *testing.T) { test(t, false, false) }) + t.Run("full-combined", func(t *testing.T) { test(t, false, true) }) +} + +// Test a batch that was only partially proposed. +// This should not happen, but guard against it anyhow. +func TestJetStreamAtomicBatchPublishProposeOnePartialBatch(t *testing.T) { + maxEntries := 3 + for i := range maxEntries + 1 { + t.Run(fmt.Sprintf("I-%d", i), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: FileStorage, + Replicas: 3, + AllowAtomicPublish: true, + }) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + var entries []*Entry + mset.clMu.Lock() + msg := []byte("hello") + hdr := genHeader(nil, "Nats-Batch-Id", "uuid") + for j := range 3 { + // Skip if indices equal. + if i == j { + continue + } + bseq := uint64(j + 1) + hdr = setHeader("Nats-Batch-Sequence", strconv.FormatUint(bseq, 10), hdr) + commit := bseq == uint64(maxEntries) + if commit { + hdr = setHeader("Nats-Batch-Commit", "1", hdr) + } + esm := encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, msg, mset.clseq, 0, false, "uuid", bseq, commit) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + } + mset.clMu.Unlock() + n := mset.raftNode().(*raft) + n.sendAppendEntry(entries) + + pubAck, err = js.Publish("foo", nil) + require_NoError(t, err) + expectedSeq := uint64(2) + if i >= maxEntries { + expectedSeq += uint64(maxEntries) + } + require_Equal(t, pubAck.Sequence, expectedSeq) + }) + } +} + +// Test multiple sequential batches, the first batch is partially proposed. +// This should not happen, but guard against it anyhow. +func TestJetStreamAtomicBatchPublishProposeMultiplePartialBatches(t *testing.T) { + for i := range 2 { + batchSize := i + 1 + t.Run(fmt.Sprintf("B-%d", batchSize), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: FileStorage, + Replicas: 3, + AllowAtomicPublish: true, + }) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + var entries []*Entry + mset.clMu.Lock() + msg := []byte("hello") + hdr := genHeader(nil, "Nats-Batch-Id", "ID_1") + hdr = setHeader("Nats-Batch-Sequence", "1", hdr) + esm := encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, msg, mset.clseq, 0, false, "ID_1", 1, false) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + + for j := range batchSize { + bseq := uint64(j + 1) + hdr = genHeader(nil, "Nats-Batch-Id", "ID_2") + hdr = setHeader("Nats-Batch-Sequence", strconv.FormatUint(bseq, 10), hdr) + commit := bseq == uint64(batchSize) + if commit { + hdr = setHeader("Nats-Batch-Commit", "1", hdr) + } + esm = encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, msg, mset.clseq, 0, false, "ID_2", bseq, commit) + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clseq++ + } + mset.clMu.Unlock() + n := mset.raftNode().(*raft) + n.sendAppendEntry(entries) + + pubAck, err = js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, uint64(2+batchSize)) + }) + } +} + +// Test a continuous flow of batches spanning multiple append entries can still move applied up. +// Also, test a server can become leader if the previous leader left it with a partial batch. +func TestJetStreamAtomicBatchPublishContinuousBatchesStillMoveAppliedUp(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: FileStorage, + Replicas: 3, + AllowAtomicPublish: true, + }) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + n := mset.raftNode().(*raft) + index, commit, applied := n.Progress() + + // The first batch spans two append entries, but commits. + mset.clMu.Lock() + hdr := genHeader(nil, "Nats-Batch-Id", "ID_1") + hdr = setHeader("Nats-Batch-Sequence", "1", hdr) + esm := encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, nil, mset.clseq, 0, false, "ID_1", 1, false) + mset.clseq++ + mset.clMu.Unlock() + n.sendAppendEntry([]*Entry{newEntry(EntryNormal, esm)}) + + var entries []*Entry + mset.clMu.Lock() + hdr = genHeader(nil, "Nats-Batch-Id", "ID_1") + hdr = setHeader("Nats-Batch-Sequence", "2", hdr) + hdr = setHeader("Nats-Batch-Commit", "1", hdr) + esm = encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, nil, mset.clseq, 0, false, "ID_1", 2, true) + mset.clseq++ + + // The second batch doesn't commit. + entries = append(entries, newEntry(EntryNormal, esm)) + hdr = genHeader(nil, "Nats-Batch-Id", "ID_2") + hdr = setHeader("Nats-Batch-Sequence", "1", hdr) + esm = encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, nil, mset.clseq, 0, false, "ID_2", 1, false) + mset.clseq++ + entries = append(entries, newEntry(EntryNormal, esm)) + mset.clMu.Unlock() + n.sendAppendEntry(entries) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + n.RLock() + nindex, ncommit, processed, napplied := n.pindex, n.commit, n.processed, n.applied + n.RUnlock() + if nindex == index { + return errors.New("index not updated") + } else if ncommit == commit { + return errors.New("commit not updated") + } else if napplied == applied { + return errors.New("applied not updated") + } else if napplied == ncommit { + return errors.New("applied should not be able to equal commit yet") + } else if processed != ncommit { + return errors.New("must have processed all commits") + } + return checkState(t, c, globalAccountName, "TEST") + }) + + // Followers are now stranded with a partial batch, one needs to become leader + // and have the batch be rejected since it was partially proposed. + sl.Shutdown() + c.waitOnStreamLeader(globalAccountName, "TEST") + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + // Confirm the last batch gets rejected, and we are still able to publish with quorum. + pubAck, err = js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 4) + + c.restartServer(sl) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + // Publish again, now with all servers online. + pubAck, err = js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 5) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) +} diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d765f546f5..33cb365902 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -114,6 +114,9 @@ const ( compressedStreamMsgOp // For sending deleted gaps on catchups for replicas. deleteRangeOp + // Batch stream ops. + batchMsgOp + batchCommitMsgOp ) // raftGroups are controlled by the metagroup controller. @@ -2486,13 +2489,24 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps sendSnapshot = false } continue + } else if len(ce.Entries) == 0 { + // Entry could be empty on a restore when mset is nil. + ne, nb = n.Applied(ce.Index) + ce.ReturnToPool() + continue } // Apply our entries. - if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { + if maxApplied, err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { // Update our applied. - ne, nb = n.Applied(ce.Index) - ce.ReturnToPool() + if maxApplied > 0 { + // Indicate we've processed (but not applied) everything up to this point. + ne, nb = n.Processed(ce.Index, min(maxApplied, ce.Index)) + // Don't return entry to the pool, this is handled by the in-progress batch. + } else { + ne, nb = n.Applied(ce.Index) + ce.ReturnToPool() + } } else { // Make sure to clean up. ce.ReturnToPool() @@ -2952,162 +2966,143 @@ func isControlHdr(hdr []byte) bool { } // Apply our stream entries. -func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error { - for _, e := range ce.Entries { - if e.Type == EntryNormal { - buf, op := e.Data, entryOp(e.Data[0]) - switch op { - case streamMsgOp, compressedStreamMsgOp: - if mset == nil { - continue - } - s := js.srv +// Return maximum allowed applied value, if currently inside a batch, zero otherwise. +func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) (uint64, error) { + mset.mu.RLock() + batch := mset.batchApply + mset.mu.RUnlock() - mbuf := buf[1:] - if op == compressedStreamMsgOp { - var err error - mbuf, err = s2.Decode(nil, mbuf) - if err != nil { - panic(err.Error()) - } - } + for i, e := range ce.Entries { + // Check if a batch is abandoned. + if e.Type != EntryNormal && batch != nil && batch.id != _EMPTY_ { + batch.rejectBatchState(mset) + } - subject, reply, hdr, msg, lseq, ts, sourced, err := decodeStreamMsg(mbuf) + if e.Type == EntryNormal { + buf, op := e.Data, entryOp(e.Data[0]) + if op == batchMsgOp { + batchId, batchSeq, _, _, err := decodeBatchMsg(buf[1:]) if err != nil { - if node := mset.raftNode(); node != nil { - s.Errorf("JetStream cluster could not decode stream msg for '%s > %s' [%s]", - mset.account(), mset.name(), node.Group()) - } panic(err.Error()) } - - // Check for flowcontrol here. - if len(msg) == 0 && len(hdr) > 0 && reply != _EMPTY_ && isControlHdr(hdr) { - if !isRecovering { - mset.sendFlowControlReply(reply) - } - continue - } - - // Grab last sequence and CLFS. - last, clfs := mset.lastSeqAndCLFS() - - // We can skip if we know this is less than what we already have. - if lseq-clfs < last { - s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", - mset.account(), mset.name(), lseq+1-clfs, last) + // Initialize if unset. + if batch == nil { + batch = &batchApply{} mset.mu.Lock() - // Check for any preAcks in case we are interest based. - mset.clearAllPreAcks(lseq + 1 - clfs) + mset.batchApply = batch mset.mu.Unlock() - continue } - // Skip by hand here since first msg special case. - // Reason is sequence is unsigned and for lseq being 0 - // the lseq under stream would have to be -1. - if lseq == 0 && last != 0 { + batch.mu.Lock() + // Previous batch (if any) was abandoned. + if batch.id != _EMPTY_ && batchId != batch.id { + batch.rejectBatchStateLocked(mset) + } + batch.count++ + // If the sequence is not monotonically increasing/we identify gaps, the batch can't be accepted. + if batchSeq != batch.count { + batch.rejectBatchStateLocked(mset) + batch.mu.Unlock() continue } - - // Messages to be skipped have no subject or timestamp or msg or hdr. - if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 { - // Skip and update our lseq. - last := mset.store.SkipMsg() + // If this is the first message in the batch, need to mark the start index. + // We'll continue to check batch-completeness and try to find the commit. + // At that point we'll commit the whole batch. + if batchSeq == 1 { + batch.entryStart = i + batch.maxApplied = ce.Index - 1 + } + batch.id = batchId + batch.mu.Unlock() + continue + } else if op == batchCommitMsgOp { + batchId, batchSeq, _, _, err := decodeBatchMsg(buf[1:]) + if err != nil { + panic(err.Error()) + } + // Initialize if unset. + if batch == nil { + batch = &batchApply{} mset.mu.Lock() - mset.setLastSeq(last) - mset.clearAllPreAcks(last) + mset.batchApply = batch mset.mu.Unlock() - continue } - var mt *msgTrace - // If not recovering, see if we find a message trace object for this - // sequence. Only the leader that has proposed this entry will have - // stored the trace info. - if !isRecovering { - mt = mset.getAndDeleteMsgTrace(lseq) + batch.mu.Lock() + // Previous batch (if any) was abandoned. + if batch.id != _EMPTY_ && batchId != batch.id { + batch.rejectBatchStateLocked(mset) } - // Process the actual message here. - err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced) - // If we have inflight make sure to clear after processing. - // TODO(dlc) - technically check on inflight != nil could cause datarace. - // But do not want to acquire lock since tracking this will be rare. - if mset.inflight != nil { - mset.clMu.Lock() - if i, found := mset.inflight[subject]; found { - // Decrement from pending operations. Once it reaches zero, it can be deleted. - if i.ops > 0 { - var sz uint64 - if mset.store.Type() == FileStorage { - sz = fileStoreMsgSizeRaw(len(subject), len(hdr), len(msg)) - } else { - sz = memStoreMsgSizeRaw(len(subject), len(hdr), len(msg)) - } - if i.bytes >= sz { - i.bytes -= sz - } else { - i.bytes = 0 - } - i.ops-- - } - if i.ops == 0 { - delete(mset.inflight, subject) - } - } - mset.clMu.Unlock() + var entries []*Entry + batch.count++ + // Detected a gap, reject the batch. + if batchSeq != batch.count { + batch.rejectBatchStateLocked(mset) + batch.mu.Unlock() + continue } - // Update running total for counter. - if mset.clusteredCounterTotal != nil { - mset.clMu.Lock() - if counter, found := mset.clusteredCounterTotal[subject]; found { - // Decrement from pending operations. Once it reaches zero, it can be deleted. - if counter.ops > 0 { - counter.ops-- + // Process any entries that are part of this batch but prior to the current one. + for j, bce := range batch.entries { + if j == 0 { + // The first needs only the entries when the batch is started. + entries = bce.Entries[batch.entryStart:] + } else { + // Otherwise, all entries are used. + entries = bce.Entries + } + for _, entry := range entries { + _, _, op, buf, err = decodeBatchMsg(entry.Data[1:]) + if err != nil { + batch.mu.Unlock() + panic(err.Error()) } - if counter.ops == 0 { - delete(mset.clusteredCounterTotal, subject) + if err = js.applyStreamMsgOp(mset, op, buf, isRecovering); err != nil { + batch.mu.Unlock() + // Make sure to return remaining entries to the pool on an error. + for _, nce := range batch.entries[j:] { + nce.ReturnToPool() + } + return 0, err } } - mset.clMu.Unlock() + // Return the entry to the pool now. + bce.ReturnToPool() } - - // Clear expected per subject state after processing. - if mset.expectedPerSubjectSequence != nil { - mset.clMu.Lock() - if subj, found := mset.expectedPerSubjectSequence[lseq]; found { - delete(mset.expectedPerSubjectSequence, lseq) - delete(mset.expectedPerSubjectInProcess, subj) - } - mset.clMu.Unlock() + if len(batch.entries) == 0 { + // Get within the same entry, but within the range of this batch. + entries = ce.Entries[batch.entryStart : i+1] + } else { + // Get all entries up to and including the current one. + entries = ce.Entries[:i+1] } - - if err != nil { - if err == errLastSeqMismatch { - - var state StreamState - mset.store.FastState(&state) - - // If we have no msgs and the other side is delivering us a sequence past where we - // should be reset. This is possible if the other side has a stale snapshot and no longer - // has those messages. So compact and retry to reset. - if state.Msgs == 0 { - mset.store.Compact(lseq + 1) - // Retry - err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced) - } - // FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected - // and what we got. + // Process remaining entries in the current entry. + for _, entry := range entries { + _, _, op, buf, err = decodeBatchMsg(entry.Data[1:]) + if err != nil { + batch.mu.Unlock() + panic(err.Error()) } - - // Only return in place if we are going to reset our stream or we are out of space, or we are closed. - if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed { - return err + if err = js.applyStreamMsgOp(mset, op, buf, isRecovering); err != nil { + batch.mu.Unlock() + return 0, err } - s.Debugf("Apply stream entries for '%s > %s' got error processing message: %v", - mset.account(), mset.name(), err) + } + // Clear state, batch was successful. + batch.clearBatchStateLocked() + batch.mu.Unlock() + continue + } else if batch != nil && batch.id != _EMPTY_ { + // If a batch is abandoned without a commit, reject it. + batch.rejectBatchState(mset) + } + + switch op { + case streamMsgOp, compressedStreamMsgOp: + mbuf := buf[1:] + if err := js.applyStreamMsgOp(mset, op, mbuf, isRecovering); err != nil { + return 0, err } case deleteMsgOp: @@ -3131,7 +3126,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Cluster reset error. if err == ErrStoreEOF { - return err + return 0, err } if err != nil && !isRecovering { @@ -3203,10 +3198,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type: %v", op)) } } else if e.Type == EntrySnapshot { - if mset == nil { - continue - } - // Everything operates on new replicated state. Will convert legacy snapshots to this for processing. var ss *StreamReplicatedState @@ -3228,13 +3219,13 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco ss, err = DecodeStreamState(e.Data) if err != nil { onBadState(err) - return err + return 0, err } } else { var snap streamSnapshot if err := json.Unmarshal(e.Data, &snap); err != nil { onBadState(err) - return err + return 0, err } // Convert over to StreamReplicatedState ss = &StreamReplicatedState{ @@ -3251,7 +3242,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if isRecovering || !mset.IsLeader() { if err := mset.processSnapshot(ss, ce.Index); err != nil { - return err + return 0, err } } } else if e.Type == EntryRemovePeer { @@ -3276,6 +3267,171 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } } } + + // If we're still actively processing a batch, must store the entry in-memory + // to come back to it later once we find the commit. + if batch != nil && batch.id != _EMPTY_ { + batch.mu.Lock() + if batch.entries == nil { + batch.entries = []*CommittedEntry{ce} + } else { + batch.entries = append(batch.entries, ce) + } + maxApplied := batch.maxApplied + batch.mu.Unlock() + return maxApplied, nil + } + return 0, nil +} + +func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isRecovering bool) error { + s := js.srv + + if op == compressedStreamMsgOp { + var err error + mbuf, err = s2.Decode(nil, mbuf) + if err != nil { + panic(err.Error()) + } + } + + subject, reply, hdr, msg, lseq, ts, sourced, err := decodeStreamMsg(mbuf) + if err != nil { + if node := mset.raftNode(); node != nil { + s.Errorf("JetStream cluster could not decode stream msg for '%s > %s' [%s]", + mset.account(), mset.name(), node.Group()) + } + panic(err.Error()) + } + + // Check for flowcontrol here. + if len(msg) == 0 && len(hdr) > 0 && reply != _EMPTY_ && isControlHdr(hdr) { + if !isRecovering { + mset.sendFlowControlReply(reply) + } + return nil + } + + // Grab last sequence and CLFS. + last, clfs := mset.lastSeqAndCLFS() + + // We can skip if we know this is less than what we already have. + if lseq-clfs < last { + s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", + mset.account(), mset.name(), lseq+1-clfs, last) + mset.mu.Lock() + // Check for any preAcks in case we are interest based. + mset.clearAllPreAcks(lseq + 1 - clfs) + mset.mu.Unlock() + return nil + } + + // Skip by hand here since first msg special case. + // Reason is sequence is unsigned and for lseq being 0 + // the lseq under stream would have to be -1. + if lseq == 0 && last != 0 { + return nil + } + + // Messages to be skipped have no subject or timestamp or msg or hdr. + if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 { + // Skip and update our lseq. + last := mset.store.SkipMsg() + mset.mu.Lock() + mset.setLastSeq(last) + mset.clearAllPreAcks(last) + mset.mu.Unlock() + return nil + } + + var mt *msgTrace + // If not recovering, see if we find a message trace object for this + // sequence. Only the leader that has proposed this entry will have + // stored the trace info. + if !isRecovering { + mt = mset.getAndDeleteMsgTrace(lseq) + } + // Process the actual message here. + err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced) + + // If we have inflight make sure to clear after processing. + // TODO(dlc) - technically check on inflight != nil could cause datarace. + // But do not want to acquire lock since tracking this will be rare. + if mset.inflight != nil { + mset.clMu.Lock() + if i, found := mset.inflight[subject]; found { + // Decrement from pending operations. Once it reaches zero, it can be deleted. + if i.ops > 0 { + var sz uint64 + if mset.store.Type() == FileStorage { + sz = fileStoreMsgSizeRaw(len(subject), len(hdr), len(msg)) + } else { + sz = memStoreMsgSizeRaw(len(subject), len(hdr), len(msg)) + } + if i.bytes >= sz { + i.bytes -= sz + } else { + i.bytes = 0 + } + i.ops-- + } + if i.ops == 0 { + delete(mset.inflight, subject) + } + } + mset.clMu.Unlock() + } + + // Update running total for counter. + if mset.clusteredCounterTotal != nil { + mset.clMu.Lock() + if counter, found := mset.clusteredCounterTotal[subject]; found { + // Decrement from pending operations. Once it reaches zero, it can be deleted. + if counter.ops > 0 { + counter.ops-- + } + if counter.ops == 0 { + delete(mset.clusteredCounterTotal, subject) + } + } + mset.clMu.Unlock() + } + + // Clear expected per subject state after processing. + if mset.expectedPerSubjectSequence != nil { + mset.clMu.Lock() + if subj, found := mset.expectedPerSubjectSequence[lseq]; found { + delete(mset.expectedPerSubjectSequence, lseq) + delete(mset.expectedPerSubjectInProcess, subj) + } + mset.clMu.Unlock() + } + + if err != nil { + if err == errLastSeqMismatch { + + var state StreamState + mset.store.FastState(&state) + + // If we have no msgs and the other side is delivering us a sequence past where we + // should be reset. This is possible if the other side has a stale snapshot and no longer + // has those messages. So compact and retry to reset. + if state.Msgs == 0 { + mset.store.Compact(lseq + 1) + // Retry + err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced) + } + // FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected + // and what we got. + } + + // Only return in place if we are going to reset our stream or we are out of space, or we are closed. + if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed { + return err + } + s.Debugf("Apply stream entries for '%s > %s' got error processing message: %v", + mset.account(), mset.name(), err) + } return nil } @@ -3358,7 +3514,7 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { } // Clear clseq. If we become leader again, it will be fixed up - // automatically on the next processClusteredInboundMsg call. + // automatically on the next mset.setLeader call. mset.clMu.Lock() if mset.clseq > 0 { mset.clseq = 0 @@ -7910,6 +8066,29 @@ func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq u return subject, reply, hdr, msg, lseq, ts, sourced, nil } +func decodeBatchMsg(buf []byte) (batchId string, batchSeq uint64, op entryOp, mbuf []byte, err error) { + var le = binary.LittleEndian + if len(buf) < 2 { + return _EMPTY_, 0, 0, nil, errBadStreamMsg + } + bl := int(le.Uint16(buf)) + buf = buf[2:] + if len(buf) < bl { + return _EMPTY_, 0, 0, nil, errBadStreamMsg + } + batchId = string(buf[:bl]) + buf = buf[bl:] + var n int + batchSeq, n = binary.Uvarint(buf) + if n <= 0 { + return _EMPTY_, 0, 0, nil, errBadStreamMsg + } + buf = buf[n:] + op = entryOp(buf[0]) + mbuf = buf[1:] + return batchId, batchSeq, op, mbuf, nil +} + // Flags for encodeStreamMsg/decodeStreamMsg. const ( msgFlagFromSourceOrMirror uint64 = 1 << iota @@ -7919,12 +8098,16 @@ func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int return encodeStreamMsgAllowCompress(subject, reply, hdr, msg, lseq, ts, sourced) } +func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool) []byte { + return encodeStreamMsgAllowCompressAndBatch(subject, reply, hdr, msg, lseq, ts, sourced, _EMPTY_, 0, false) +} + // Threshold for compression. // TODO(dlc) - Eventually make configurable. const compressThreshold = 8192 // 8k // If allowed and contents over the threshold we will compress. -func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool) []byte { +func encodeStreamMsgAllowCompressAndBatch(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool, batchId string, batchSeq uint64, batchCommit bool) []byte { // Clip the subject, reply, header and msgs down. Operate on // uint64 lengths to avoid overflowing. slen := min(uint64(len(subject)), math.MaxUint16) @@ -7937,15 +8120,34 @@ func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq u elen := int(1 + 8 + 8 + total) elen += (2 + 2 + 2 + 4 + 8) // Encoded lengths, 4bytes, flags are up to 8 bytes + blen := min(uint64(len(batchId)), math.MaxUint16) + if batchId != _EMPTY_ { + elen += int(2 + blen + 8) // length of batchId, batchId itself, batchSeq (up to 8 bytes) + } + var flags uint64 if sourced { flags |= msgFlagFromSourceOrMirror } + var le = binary.LittleEndian + var opIndex int buf := make([]byte, 1, elen) - buf[0] = byte(streamMsgOp) + if batchId != _EMPTY_ { + if batchCommit { + buf[0] = byte(batchCommitMsgOp) + } else { + buf[0] = byte(batchMsgOp) + } + buf = le.AppendUint16(buf, uint16(blen)) + buf = append(buf, batchId[:blen]...) + buf = binary.AppendUvarint(buf, batchSeq) + opIndex = len(buf) + buf = append(buf, byte(streamMsgOp)) + } else { + buf[opIndex] = byte(streamMsgOp) + } - var le = binary.LittleEndian buf = le.AppendUint64(buf, lseq) buf = le.AppendUint64(buf, uint64(ts)) buf = le.AppendUint16(buf, uint16(slen)) @@ -7961,12 +8163,15 @@ func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq u // Check if we should compress. if shouldCompress { nbuf := make([]byte, s2.MaxEncodedLen(elen)) - nbuf[0] = byte(compressedStreamMsgOp) - ebuf := s2.Encode(nbuf[1:], buf[1:]) + if opIndex > 0 { + copy(nbuf[:opIndex], buf[:opIndex]) + } + nbuf[opIndex] = byte(compressedStreamMsgOp) + ebuf := s2.Encode(nbuf[opIndex+1:], buf[opIndex+1:]) // Only pay the cost of decode on the other side if we compressed. // S2 will allow us to try without major penalty for non-compressable data. if len(ebuf) < len(buf) { - buf = nbuf[:len(ebuf)+1] + buf = nbuf[:len(ebuf)+opIndex+1] } } @@ -8145,9 +8350,25 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Check if we need to set initial value here mset.clMu.Lock() if mset.clseq == 0 || mset.clseq < lseq+mset.clfs { + // Need to unlock and re-acquire the locks in the proper order. + mset.clMu.Unlock() + // Locking order is stream -> batchMu -> clMu + mset.mu.RLock() + batch := mset.batchApply + var batchCount uint64 + if batch != nil { + batch.mu.Lock() + batchCount = batch.count + } + mset.clMu.Lock() // Re-capture - lseq = mset.lastSeq() - mset.clseq = lseq + mset.clfs + lseq = mset.lseq + mset.clseq = lseq + mset.clfs + batchCount + // Keep hold of the mset.clMu, but unlock the others. + if batch != nil { + batch.mu.Unlock() + } + mset.mu.RUnlock() } var ( diff --git a/server/raft.go b/server/raft.go index 9870b284d4..2daff99a84 100644 --- a/server/raft.go +++ b/server/raft.go @@ -44,6 +44,7 @@ type RaftNode interface { SendSnapshot(snap []byte) error NeedSnapshot() bool Applied(index uint64) (entries uint64, bytes uint64) + Processed(index uint64, applied uint64) (entries uint64, bytes uint64) State() RaftState Size() (entries, bytes uint64) Progress() (index, commit, applied uint64) @@ -164,12 +165,13 @@ type raft struct { llqrt time.Time // Last quorum lost time lsut time.Time // Last scale-up time - term uint64 // The current vote term - pterm uint64 // Previous term from the last snapshot - pindex uint64 // Previous index from the last snapshot - commit uint64 // Index of the most recent commit - applied uint64 // Index of the most recently applied commit - papplied uint64 // First sequence of our log, matches when we last installed a snapshot. + term uint64 // The current vote term + pterm uint64 // Previous term from the last snapshot + pindex uint64 // Previous index from the last snapshot + commit uint64 // Index of the most recent commit + processed uint64 // Index of the most recently processed commit + applied uint64 // Index of the most recently applied commit + papplied uint64 // First sequence of our log, matches when we last installed a snapshot. aflr uint64 // Index when to signal initial messages have been applied after becoming leader. 0 means signaling is disabled. @@ -1093,6 +1095,16 @@ func (n *raft) ResumeApply() { // apply queue. It will return the number of entries and an estimation of the // byte size that could be removed with a snapshot/compact. func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { + return n.Processed(index, index) +} + +// Processed is a callback that must be called by the upper layer when it +// has processed the committed entries that it received from the apply queue, +// but it (maybe) hasn't applied all the processed entries yet. +// Used to indicate a commit was processed, even if it wasn't applied yet and +// can't be compacted away by a snapshot just yet. Which allows us to try to +// become leader if we've processed all commits, even if they're not all applied. +func (n *raft) Processed(index uint64, applied uint64) (entries uint64, bytes uint64) { n.Lock() defer n.Unlock() @@ -1101,13 +1113,22 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { return 0, 0 } + // Ignore if already processed. + if index > n.processed { + n.processed = index + } + // Ignore if already applied. - if index > n.applied { - n.applied = index + if applied > index { + applied = index + } + if applied > n.applied { + n.applied = applied } - // If it was set, and we reached the minimum applied index, reset and send signal to upper layer. - if n.aflr > 0 && index >= n.aflr { + // If it was set, and we reached the minimum processed index, reset and send signal to upper layer. + // We're not waiting for processed AND applied, because applying could take longer. + if n.aflr > 0 && n.processed >= n.aflr { n.aflr = 0 // Quick sanity-check to confirm we're still leader. // In which case we must signal, since switchToLeader would not have done so already. @@ -3341,8 +3362,11 @@ func (n *raft) truncateWAL(term, index uint64) { if n.commit > n.pindex { n.commit = n.pindex } - if n.applied > n.commit { - n.applied = n.commit + if n.processed > n.commit { + n.processed = n.commit + } + if n.applied > n.processed { + n.applied = n.processed } if n.papplied > n.applied { n.papplied = n.applied @@ -4465,7 +4489,7 @@ func (n *raft) switchToCandidate() { // If we are catching up or are in observer mode we can not switch. // Avoid petitioning to become leader if we're behind on applies. - if n.observer || n.paused || n.applied < n.commit { + if n.observer || n.paused || n.processed < n.commit { n.resetElect(minElectionTimeout / 4) return } diff --git a/server/raft_test.go b/server/raft_test.go index 8fdf93b794..f67958df4c 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -2761,10 +2761,11 @@ func TestNRGSnapshotRecovery(t *testing.T) { require_NoError(t, n.InstallSnapshot(nil)) // Restoring the snapshot should not up applied, because the apply queue is async. - n.pindex, n.commit, n.applied = 0, 0, 0 + n.pindex, n.commit, n.processed, n.applied = 0, 0, 0, 0 n.setupLastSnapshot() require_Equal(t, n.pindex, 1) require_Equal(t, n.commit, 1) + require_Equal(t, n.processed, 0) require_Equal(t, n.applied, 0) } @@ -3030,6 +3031,7 @@ func TestNRGSizeAndApplied(t *testing.T) { entries uint64 bytes uint64 ) + // Initially our WAL is empty. entries, bytes = n.Size() require_Equal(t, entries, 0) @@ -3089,6 +3091,7 @@ func TestNRGIgnoreEntryAfterCanceledCatchup(t *testing.T) { entries := []*Entry{newEntry(EntryNormal, esm)} nats0 := "S1Nunr6R" // "nats-0" + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) @@ -3286,6 +3289,69 @@ func TestNRGTruncateOnStartup(t *testing.T) { require_Equal(t, state.NumDeleted, 0) } +func TestNRGProcessed(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 2, entries: entries}) + aeMsg4 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 4, pterm: 1, pindex: 3, entries: entries}) + aeMsg5 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 5, pterm: 1, pindex: 4, entries: entries}) + + // Store three entries. + for i, aeMsg := range []*appendEntry{aeMsg1, aeMsg2, aeMsg3} { + n.processAppendEntry(aeMsg, n.aesub) + require_Equal(t, n.pindex, uint64(i+1)) + require_Equal(t, n.commit, uint64(i+1)) + require_Equal(t, n.processed, 0) + require_Equal(t, n.applied, 0) + } + + // n.Processed can move both n.processed and n.applied up, with different values. + n.Processed(1, 0) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_Equal(t, n.processed, 1) + require_Equal(t, n.applied, 0) + + n.Processed(2, 1) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_Equal(t, n.processed, 2) + require_Equal(t, n.applied, 1) + + // n.Applied moves both n.processed and n.applied up to the same value. + n.Applied(3) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_Equal(t, n.processed, 3) + require_Equal(t, n.applied, 3) + + // Store the remaining messages. + for i, aeMsg := range []*appendEntry{aeMsg4, aeMsg5} { + n.processAppendEntry(aeMsg, n.aesub) + require_Equal(t, n.pindex, uint64(i+4)) + require_Equal(t, n.commit, uint64(i+4)) + require_Equal(t, n.processed, 3) + require_Equal(t, n.applied, 3) + } + + // An invalid processed call with a higher applied should be clamped back down to the processed index. + n.Processed(4, 5) + require_Equal(t, n.pindex, 5) + require_Equal(t, n.commit, 5) + require_Equal(t, n.processed, 4) + require_Equal(t, n.applied, 4) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: diff --git a/server/stream.go b/server/stream.go index 6c54f608d7..ec8dcecfaf 100644 --- a/server/stream.go +++ b/server/stream.go @@ -415,7 +415,8 @@ type stream struct { monitorWg sync.WaitGroup // Wait group for the monitor routine. - batches *batching // Inflight batches prior to committing them. + batches *batching // Inflight batches prior to committing them. + batchApply *batchApply // State to check for batch completeness before applying it. } // inflightSubjectRunningTotal stores a running total of inflight messages for a specific subject. @@ -2283,6 +2284,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool, // If atomic publish is disabled, delete any in-progress batches. if !cfg.AllowAtomicPublish { mset.deleteInflightBatches(false) + mset.deleteBatchApplyState() } // Now update config and store's version of our config. @@ -4233,16 +4235,19 @@ func (mset *stream) unsubscribeToStream(stopping, shuttingDown bool) error { // Clear batching state. mset.deleteInflightBatches(shuttingDown) - // In case we had direct get subscriptions. + if stopping { + mset.deleteBatchApplyState() + + // In case we had a direct get subscriptions. + mset.unsubscribeToDirect() + mset.unsubscribeToMirrorDirect() + } + if mset.directLeaderSub == nil { // Always unsubscribe the leader sub. mset.unsubscribe(mset.directLeaderSub) mset.directLeaderSub = nil } - if stopping { - mset.unsubscribeToDirect() - mset.unsubscribeToMirrorDirect() - } mset.active = false return nil @@ -4265,6 +4270,17 @@ func (mset *stream) deleteInflightBatches(shuttingDown bool) { } } +// Lock should be held. +func (mset *stream) deleteBatchApplyState() { + if batch := mset.batchApply; batch != nil { + // Need to return entries (if any) to the pool. + for _, bce := range batch.entries { + bce.ReturnToPool() + } + mset.batchApply = nil + } +} + // Lock does NOT need to be held, we set the client on setup and never change it at this point. func (mset *stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) { if mset.closed.Load() { @@ -6079,6 +6095,31 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr // Need to hold this lock even if we're not clustered, because we'll // be accessing state that requires this lock (even if it's empty). mset.clMu.Lock() + + // We only use mset.clseq for clustering and in case we run ahead of actual commits. + // Check if we need to set initial value here + if isClustered && (mset.clseq == 0 || mset.clseq < lseq+mset.clfs) { + // Need to unlock and re-acquire the locks in the proper order. + mset.clMu.Unlock() + // Locking order is stream -> batchMu -> clMu + mset.mu.RLock() + batch := mset.batchApply + var batchCount uint64 + if batch != nil { + batch.mu.Lock() + batchCount = batch.count + } + mset.clMu.Lock() + // Re-capture + lseq = mset.lseq + mset.clseq = lseq + mset.clfs + batchCount + // Keep hold of the mset.clMu, but unlock the others. + if batch != nil { + batch.mu.Unlock() + } + mset.mu.RUnlock() + } + rollback := func(seq uint64) { if isClustered { // TODO(mvv): reset in-memory expected header maps @@ -6145,10 +6186,11 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr if isClustered { var _reply string - if seq == batchSeq { + isCommit := seq == batchSeq + if isCommit { _reply = reply } - esm := encodeStreamMsgAllowCompress(bsubj, _reply, bhdr, bmsg, mset.clseq, ts, false) + esm := encodeStreamMsgAllowCompressAndBatch(bsubj, _reply, bhdr, bmsg, mset.clseq, ts, false, batchId, seq, isCommit) entries = append(entries, newEntry(EntryNormal, esm)) mset.clseq++ sz += len(esm) @@ -6175,9 +6217,9 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr mset.processJetStreamMsg(bsubj, _reply, bhdr, bmsg, 0, 0, mt, false) } } else { - // Do proposal. + // Do a single multi proposal. This ensures we get to push all entries to the proposal queue in-order + // and not interleaved with other proposals. diff.commit(mset) - // TODO(mvv): replace with individual `node.Propose`? if err := node.ProposeMulti(entries); err == nil { mset.trackReplicationTraffic(node, sz, r) } else {