Skip to content

Commit 3c72a62

Browse files
(2.12) NRG: separate Processed and Applied for batch-semantics
Signed-off-by: Maurice van Veen <[email protected]>
1 parent b868f16 commit 3c72a62

File tree

6 files changed

+243
-44
lines changed

6 files changed

+243
-44
lines changed

locksordering.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ The "clsMu" lock protects the consumer list on a stream, used for signalling con
3535

3636
stream -> clsMu
3737

38-
The "clMu" and "ddMu" locks protect clustered and dedupe state respectively. The stream lock (`mset.mu`) is optional,
39-
but if holding "clMu" or "ddMu", locking the stream lock afterward would violate locking order.
38+
The "clMu", "ddMu" and "batchMu" locks protect clustered, dedupe and batch state respectively.
39+
The stream lock (`mset.mu`) is optional, but if holding "clMu", "ddMu" or "batchMu",
40+
locking the stream lock afterward would violate locking order.
4041

4142
stream -> clMu
43+
stream -> batchMu -> clMu
4244
stream -> ddMu

server/jetstream_batching_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package server
1818
import (
1919
"bytes"
2020
"encoding/json"
21+
"errors"
2122
"fmt"
2223
"strconv"
2324
"testing"
@@ -634,3 +635,104 @@ func TestJetStreamAtomicBatchPublishProposeMultiplePartialBatches(t *testing.T)
634635
})
635636
}
636637
}
638+
639+
// Test a continuous flow of batches spanning multiple append entries can still move applied up.
640+
// Also, test a server can become leader if the previous leader left it with a partial batch.
641+
func TestJetStreamAtomicBatchPublishContinuousBatchesStillMoveAppliedUp(t *testing.T) {
642+
c := createJetStreamClusterExplicit(t, "R3S", 3)
643+
defer c.shutdown()
644+
645+
nc, js := jsClientConnect(t, c.randomServer())
646+
defer nc.Close()
647+
648+
_, err := jsStreamCreate(t, nc, &StreamConfig{
649+
Name: "TEST",
650+
Subjects: []string{"foo"},
651+
Storage: FileStorage,
652+
Replicas: 3,
653+
AllowAtomicPublish: true,
654+
})
655+
require_NoError(t, err)
656+
657+
sl := c.streamLeader(globalAccountName, "TEST")
658+
mset, err := sl.globalAccount().lookupStream("TEST")
659+
require_NoError(t, err)
660+
661+
pubAck, err := js.Publish("foo", nil)
662+
require_NoError(t, err)
663+
require_Equal(t, pubAck.Sequence, 1)
664+
665+
n := mset.raftNode().(*raft)
666+
index, commit, applied := n.Progress()
667+
668+
// The first batch spans two append entries, but commits.
669+
mset.clMu.Lock()
670+
hdr := genHeader(nil, "Nats-Batch-Id", "ID_1")
671+
hdr = setHeader("Nats-Batch-Sequence", "1", hdr)
672+
esm := encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, nil, mset.clseq, 0, false, "ID_1", 1, false)
673+
mset.clseq++
674+
mset.clMu.Unlock()
675+
n.sendAppendEntry([]*Entry{newEntry(EntryNormal, esm)})
676+
677+
var entries []*Entry
678+
mset.clMu.Lock()
679+
hdr = genHeader(nil, "Nats-Batch-Id", "ID_1")
680+
hdr = setHeader("Nats-Batch-Sequence", "2", hdr)
681+
hdr = setHeader("Nats-Batch-Commit", "1", hdr)
682+
esm = encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, nil, mset.clseq, 0, false, "ID_1", 2, true)
683+
mset.clseq++
684+
685+
// The second batch doesn't commit.
686+
entries = append(entries, newEntry(EntryNormal, esm))
687+
hdr = genHeader(nil, "Nats-Batch-Id", "ID_2")
688+
hdr = setHeader("Nats-Batch-Sequence", "1", hdr)
689+
esm = encodeStreamMsgAllowCompressAndBatch("foo", _EMPTY_, hdr, nil, mset.clseq, 0, false, "ID_2", 1, false)
690+
mset.clseq++
691+
entries = append(entries, newEntry(EntryNormal, esm))
692+
mset.clMu.Unlock()
693+
n.sendAppendEntry(entries)
694+
695+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
696+
n.RLock()
697+
nindex, ncommit, processed, napplied := n.pindex, n.commit, n.processed, n.applied
698+
n.RUnlock()
699+
if nindex == index {
700+
return errors.New("index not updated")
701+
} else if ncommit == commit {
702+
return errors.New("commit not updated")
703+
} else if napplied == applied {
704+
return errors.New("applied not updated")
705+
} else if napplied == ncommit {
706+
return errors.New("applied should not be able to equal commit yet")
707+
} else if processed != ncommit {
708+
return errors.New("must have processed all commits")
709+
}
710+
return checkState(t, c, globalAccountName, "TEST")
711+
})
712+
713+
// Followers are now stranded with a partial batch, one needs to become leader
714+
// and have the batch be rejected since it was partially proposed.
715+
sl.Shutdown()
716+
c.waitOnStreamLeader(globalAccountName, "TEST")
717+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
718+
return checkState(t, c, globalAccountName, "TEST")
719+
})
720+
721+
// Confirm the last batch gets rejected, and we are still able to publish with quorum.
722+
pubAck, err = js.Publish("foo", nil)
723+
require_NoError(t, err)
724+
require_Equal(t, pubAck.Sequence, 4)
725+
726+
c.restartServer(sl)
727+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
728+
return checkState(t, c, globalAccountName, "TEST")
729+
})
730+
731+
// Publish again, now with all servers online.
732+
pubAck, err = js.Publish("foo", nil)
733+
require_NoError(t, err)
734+
require_Equal(t, pubAck.Sequence, 5)
735+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
736+
return checkState(t, c, globalAccountName, "TEST")
737+
})
738+
}

server/jetstream_cluster.go

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2463,9 +2463,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
24632463
}
24642464

24652465
// Apply our entries.
2466-
if applied, err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
2466+
if maxApplied, err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
24672467
// Update our applied.
2468-
if applied {
2468+
if maxApplied > 0 {
2469+
// Indicate we've processed (but not applied) everything up to this point.
2470+
ne, nb = n.Processed(ce.Index, min(maxApplied, ce.Index))
2471+
// Don't return entry to the pool, this is handled by the in-progress batch.
2472+
} else {
24692473
ne, nb = n.Applied(ce.Index)
24702474
ce.ReturnToPool()
24712475
}
@@ -2928,8 +2932,8 @@ func isControlHdr(hdr []byte) bool {
29282932
}
29292933

29302934
// Apply our stream entries.
2931-
// Return whether the ce is applied and error.
2932-
func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) (bool, error) {
2935+
// Return maximum allowed applied value, if currently inside a batch, zero otherwise.
2936+
func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) (uint64, error) {
29332937
mset.batchMu.Lock()
29342938
batchActiveId := mset.batchId
29352939
mset.batchMu.Unlock()
@@ -2968,6 +2972,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
29682972
// At that point we'll commit the whole batch.
29692973
if batchSeq == 1 {
29702974
mset.batchEntryStart = i
2975+
mset.batchMaxApplied = ce.Index - 1
29712976
}
29722977
mset.batchId, batchActiveId = batchId, batchId
29732978
mset.batchMu.Unlock()
@@ -3011,7 +3016,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
30113016
}
30123017
if err = js.applyStreamMsgOp(mset, op, buf, isRecovering); err != nil {
30133018
mset.batchMu.Unlock()
3014-
return false, err
3019+
// Make sure to return previous entries to the pool on error.
3020+
bce.ReturnToPool()
3021+
return 0, err
30153022
}
30163023
}
30173024
// Return the entry to the pool now.
@@ -3033,7 +3040,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
30333040
}
30343041
if err = js.applyStreamMsgOp(mset, op, buf, isRecovering); err != nil {
30353042
mset.batchMu.Unlock()
3036-
return false, err
3043+
return 0, err
30373044
}
30383045
}
30393046
// Clear state, batch was successful.
@@ -3053,7 +3060,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
30533060
case streamMsgOp, compressedStreamMsgOp:
30543061
mbuf := buf[1:]
30553062
if err := js.applyStreamMsgOp(mset, op, mbuf, isRecovering); err != nil {
3056-
return false, err
3063+
return 0, err
30573064
}
30583065

30593066
case deleteMsgOp:
@@ -3077,7 +3084,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
30773084

30783085
// Cluster reset error.
30793086
if err == ErrStoreEOF {
3080-
return false, err
3087+
return 0, err
30813088
}
30823089

30833090
if err != nil && !isRecovering {
@@ -3170,13 +3177,13 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
31703177
ss, err = DecodeStreamState(e.Data)
31713178
if err != nil {
31723179
onBadState(err)
3173-
return false, err
3180+
return 0, err
31743181
}
31753182
} else {
31763183
var snap streamSnapshot
31773184
if err := json.Unmarshal(e.Data, &snap); err != nil {
31783185
onBadState(err)
3179-
return false, err
3186+
return 0, err
31803187
}
31813188
// Convert over to StreamReplicatedState
31823189
ss = &StreamReplicatedState{
@@ -3193,7 +3200,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
31933200

31943201
if isRecovering || !mset.IsLeader() {
31953202
if err := mset.processSnapshot(ss, ce.Index); err != nil {
3196-
return false, err
3203+
return 0, err
31973204
}
31983205
}
31993206
} else if e.Type == EntryRemovePeer {
@@ -3228,9 +3235,11 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
32283235
} else {
32293236
mset.batchEntries = append(mset.batchEntries, ce)
32303237
}
3238+
maxApplied := mset.batchMaxApplied
32313239
mset.batchMu.Unlock()
3240+
return maxApplied, nil
32323241
}
3233-
return batchActiveId == _EMPTY_, nil
3242+
return 0, nil
32343243
}
32353244

32363245
// clearBatchStateLocked clears in-memory apply-batch-related state.
@@ -3240,6 +3249,7 @@ func (mset *stream) clearBatchStateLocked() {
32403249
mset.batchCount = 0
32413250
mset.batchEntries = nil
32423251
mset.batchEntryStart = 0
3252+
mset.batchMaxApplied = 0
32433253
}
32443254

32453255
// rejectBatchStateLocked rejects the batch and clears in-memory apply-batch-related state.
@@ -3463,7 +3473,7 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
34633473
}
34643474

34653475
// Clear clseq. If we become leader again, it will be fixed up
3466-
// automatically on the next processClusteredInboundMsg call.
3476+
// automatically on the next mset.setLeader call.
34673477
mset.clMu.Lock()
34683478
if mset.clseq > 0 {
34693479
mset.clseq = 0
@@ -8373,14 +8383,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
83738383

83748384
// Proceed with proposing the batch.
83758385

8376-
// We only use mset.clseq for clustering and in case we run ahead of actual commits.
8377-
// Check if we need to set initial value here
83788386
mset.clMu.Lock()
8379-
if mset.clseq == 0 || mset.clseq < lseq+mset.clfs {
8380-
// Re-capture
8381-
lseq = mset.lastSeq()
8382-
mset.clseq = lseq + mset.clfs
8383-
}
83848387

83858388
// TODO(mvv): support message tracing
83868389
ts := time.Now().UnixNano()
@@ -8478,11 +8481,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
84788481
// We only use mset.clseq for clustering and in case we run ahead of actual commits.
84798482
// Check if we need to set initial value here
84808483
mset.clMu.Lock()
8481-
if mset.clseq == 0 || mset.clseq < lseq+mset.clfs {
8482-
// Re-capture
8483-
lseq = mset.lastSeq()
8484-
mset.clseq = lseq + mset.clfs
8485-
}
84868484

84878485
var (
84888486
dseq uint64

server/raft.go

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type RaftNode interface {
4444
SendSnapshot(snap []byte) error
4545
NeedSnapshot() bool
4646
Applied(index uint64) (entries uint64, bytes uint64)
47+
Processed(index uint64, applied uint64) (entries uint64, bytes uint64)
4748
State() RaftState
4849
Size() (entries, bytes uint64)
4950
Progress() (index, commit, applied uint64)
@@ -164,11 +165,12 @@ type raft struct {
164165
llqrt time.Time // Last quorum lost time
165166
lsut time.Time // Last scale-up time
166167

167-
term uint64 // The current vote term
168-
pterm uint64 // Previous term from the last snapshot
169-
pindex uint64 // Previous index from the last snapshot
170-
commit uint64 // Index of the most recent commit
171-
applied uint64 // Index of the most recently applied commit
168+
term uint64 // The current vote term
169+
pterm uint64 // Previous term from the last snapshot
170+
pindex uint64 // Previous index from the last snapshot
171+
commit uint64 // Index of the most recent commit
172+
processed uint64 // Index of the most recently processed commit
173+
applied uint64 // Index of the most recently applied commit
172174

173175
aflr uint64 // Index when to signal initial messages have been applied after becoming leader. 0 means signaling is disabled.
174176

@@ -1086,6 +1088,16 @@ func (n *raft) ResumeApply() {
10861088
// apply queue. It will return the number of entries and an estimation of the
10871089
// byte size that could be removed with a snapshot/compact.
10881090
func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
1091+
return n.Processed(index, index)
1092+
}
1093+
1094+
// Processed is a callback that must be called by the upper layer when it
1095+
// has processed the committed entries that it received from the apply queue,
1096+
// but it (maybe) hasn't applied all the processed entries yet.
1097+
// Used to indicate a commit was processed, even if it wasn't applied yet and
1098+
// can't be compacted away by a snapshot just yet. Which allows us to try to
1099+
// become leader if we've processed all commits, even if they're not all applied.
1100+
func (n *raft) Processed(index uint64, applied uint64) (entries uint64, bytes uint64) {
10891101
n.Lock()
10901102
defer n.Unlock()
10911103

@@ -1094,13 +1106,22 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
10941106
return 0, 0
10951107
}
10961108

1109+
// Ignore if already processed.
1110+
if index > n.processed {
1111+
n.processed = index
1112+
}
1113+
10971114
// Ignore if already applied.
1098-
if index > n.applied {
1099-
n.applied = index
1115+
if applied > index {
1116+
applied = index
1117+
}
1118+
if applied > n.applied {
1119+
n.applied = applied
11001120
}
11011121

1102-
// If it was set, and we reached the minimum applied index, reset and send signal to upper layer.
1103-
if n.aflr > 0 && index >= n.aflr {
1122+
// If it was set, and we reached the minimum processed index, reset and send signal to upper layer.
1123+
// We're not waiting for processed AND applied, because applying could take longer.
1124+
if n.aflr > 0 && n.processed >= n.aflr {
11041125
n.aflr = 0
11051126
// Quick sanity-check to confirm we're still leader.
11061127
// In which case we must signal, since switchToLeader would not have done so already.
@@ -3333,8 +3354,11 @@ func (n *raft) truncateWAL(term, index uint64) {
33333354
if n.commit > n.pindex {
33343355
n.commit = n.pindex
33353356
}
3336-
if n.applied > n.commit {
3337-
n.applied = n.commit
3357+
if n.processed > n.commit {
3358+
n.processed = n.commit
3359+
}
3360+
if n.applied > n.processed {
3361+
n.applied = n.processed
33383362
}
33393363
}()
33403364

@@ -4447,7 +4471,7 @@ func (n *raft) switchToCandidate() {
44474471

44484472
// If we are catching up or are in observer mode we can not switch.
44494473
// Avoid petitioning to become leader if we're behind on applies.
4450-
if n.observer || n.paused || n.applied < n.commit {
4474+
if n.observer || n.paused || n.processed < n.commit {
44514475
n.resetElect(minElectionTimeout / 4)
44524476
return
44534477
}

0 commit comments

Comments
 (0)