-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
(2.12) Atomic batch: support large batches #7067
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The PR is ready for review, but put in draft since I need to fix one bug where if you'd constantly use batching and they would be spread over multiple append entries, the applied would not move up which would prevent making snapshots. Instead of fully blocking |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have not looked at this specific PR, but if we stage and wait to send the batch to replicas this will be a large seesaw in throughput as the app waits a longer period of time for the ack of the commit since we have to replicate all messages after the commit. WDYT?
I'm thinking there's a slight misunderstanding here around what happens where. But I could also be misinterpreting what you're asking. Let me explain the current flow. This is how it works already on
Below is added in this PR, and happens after the above:
So, we need to do consistency checks prior to replicating AND the follower needs to do a consistency check the batch wasn't abandoned midproposal. |
What I think you meant is that instead of replicating after the commit as seen on the leader, we replicate prior to seeing the commit. And then we either invalidate the batch if it turns out invalid or we commit it. That should then be some amount faster since we don't need to replicate and then apply. I do want to consider and tryout that approach. But I'd first want to walk down this implementation path. This approach is going to be fully safe and give all the guarantees the batch needs. What I'd prefer we do:
|
Yes so this means a large latency spike when an app sends COMMIT and waits on that ack since it means we have to replicate all of the batch only on COMMIT. We could replicated as we go and then commit would check everything (if deterministic) and each one is already ready (essentially) on a commit.
|
There are some additional issues I can foresee when doing that, for example counters can't have this optimistic replication because they change the headers and message body on commit. So those can't be replicated first. Do want to note though, there's no large latency spike on commit! If the batch fits in a single append entry, the latency will be exactly the same as with |
Agree if they fit into one AE should not see any effect. And under $SYS from a server that message can be any size (but prefer not to go over 8M). But once hoisted into a user account could be subject to max payload restrictions. |
6531e0f
to
d8e92fd
Compare
server/jetstream_batching_test.go
Outdated
"fmt" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/klauspost/compress/s2" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove new line here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
server/jetstream_cluster.go
Outdated
continue | ||
} else if batchActiveId != _EMPTY_ { | ||
// If a batch is abandoned without a commit, reject it. | ||
mset.batchMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep repeating ourselves here, maybe these are functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the move into a separate batchApply
struct (d85ff64) could remove the batchActiveId
variable that needed to be kept up-to-date.
Like mentioned here #7067 (comment), could only introduce a rejectBatchState()
that's used in two places, the lock needs to be held for longer in all other cases.
server/jetstream_cluster.go
Outdated
|
||
// clearBatchStateLocked clears in-memory apply-batch-related state. | ||
// mset.batchMu lock should be held. | ||
func (mset *stream) clearBatchStateLocked() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe intro no Locked version that acquire the proper lock and release and call these functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have introduced a rejectBatchState()
which is now used in two places when that's the only thing that's done.
The others all need the lock to be held for a longer period.
server/stream.go
Outdated
batches *batching | ||
|
||
// State to check for batch completeness before applying it. | ||
batchMu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these all be in their own struct that we alloc only when we see first batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved into a separate batchApply
struct (d85ff64).
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
d8e92fd
to
ef5c2f4
Compare
Signed-off-by: Maurice van Veen <[email protected]>
ef5c2f4
to
d85ff64
Compare
Signed-off-by: Maurice van Veen <[email protected]>
5f29e0e
to
69b4735
Compare
This PR adds support for batches that are larger than a single append entry. Batches are first accumulated before replicating, then once all expected checks pass the batch is proposed (this was already the case). Now when applying the batch, we stage them in-memory until we see the commit message. If the commit message is not seen or gaps are detected, it gets rejected to prevent partially applying batches.
New
batchMsgOp
andbatchCommitMsgOp
were introduced to "wrap" thestreamMsgOp
andcompressedStreamMsgOp
to also contain thebatchId
andbatchSeq
without needing to decompress/decode the message to get the raw headers when doing consistency checks prior to commit.Applying the
streamMsgOp/compressedStreamMsgOp
is extracted into a newapplyStreamMsgOp
function. This supports doing consistency checks prior to commit when batching, and then doing applies in one go.Resolves #6978
Signed-off-by: Maurice van Veen [email protected]