Skip to content

(2.12) Atomic batch: support large batches #7067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions locksordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ The "clsMu" lock protects the consumer list on a stream, used for signalling con

stream -> clsMu

The "clMu" and "ddMu" locks protect clustered and dedupe state respectively. The stream lock (`mset.mu`) is optional,
but if holding "clMu" or "ddMu", locking the stream lock afterward would violate locking order.
The "clMu", "ddMu" and "batchMu" locks protect clustered, dedupe and batch state respectively.
The stream lock (`mset.mu`) is optional, but if holding "clMu", "ddMu" or "batchMu",
locking the stream lock afterward would violate locking order.

stream -> clMu
stream -> batchMu -> clMu
stream -> ddMu

The "mset.batches.mu" lock protects the batching state without needing to hold the stream lock.
Expand Down
39 changes: 39 additions & 0 deletions server/jetstream_batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,45 @@ func (diff *batchStagedDiff) commit(mset *stream) {
}
}

type batchApply struct {
mu sync.Mutex
id string // ID of the current batch.
count uint64 // Number of entries in the batch, for consistency checks.
entries []*CommittedEntry // Previous entries that are part of this batch.
entryStart int // The index into an entry indicating the first message of the batch.
maxApplied uint64 // Applied value before the entry containing the first message of the batch.
}

// clearBatchStateLocked clears in-memory apply-batch-related state.
// batch.mu lock should be held.
func (batch *batchApply) clearBatchStateLocked() {
batch.id = _EMPTY_
batch.count = 0
batch.entries = nil
batch.entryStart = 0
batch.maxApplied = 0
}

// rejectBatchStateLocked rejects the batch and clears in-memory apply-batch-related state.
// Corrects mset.clfs to take the failed batch into account.
// batch.mu lock should be held.
func (batch *batchApply) rejectBatchStateLocked(mset *stream) {
mset.clMu.Lock()
mset.clfs += batch.count
mset.clMu.Unlock()
// We're rejecting the batch, so all entries need to be returned to the pool.
for _, bce := range batch.entries {
bce.ReturnToPool()
}
batch.clearBatchStateLocked()
}

func (batch *batchApply) rejectBatchState(mset *stream) {
batch.mu.Lock()
defer batch.mu.Unlock()
batch.rejectBatchStateLocked(mset)
}

// checkMsgHeadersPreClusteredProposal checks the message for expected/consistency headers.
// mset.mu lock must NOT be held or used.
// mset.clMu lock must be held.
Expand Down
Loading
Loading