Skip to content

Commit 46dec81

Browse files
(2.12) Read-after-write & monotonic reads (#6970)
See [ADR: JetStream Read-after-Write](nats-io/nats-architecture-and-design#358) for context, problem statement, and design. Resolves #6557 Signed-off-by: Maurice van Veen <[email protected]>
2 parents c010add + dace607 commit 46dec81

13 files changed

+886
-48
lines changed

server/auth_callout.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
403403
return false, errStr
404404
}
405405
req := []byte(b)
406-
var hdr map[string]string
406+
var hdr []byte
407407

408408
// Check if we have been asked to encrypt.
409409
if xkp != nil {
@@ -413,7 +413,7 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
413413
s.Warnf(errStr)
414414
return false, errStr
415415
}
416-
hdr = map[string]string{AuthRequestXKeyHeader: xkey}
416+
hdr = genHeader(hdr, AuthRequestXKeyHeader, xkey)
417417
}
418418

419419
// Send out our request.

server/consumer.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ type ConsumerConfig struct {
114114
Replicas int `json:"num_replicas"`
115115
// Force memory storage.
116116
MemoryStorage bool `json:"mem_storage,omitempty"`
117+
// Force the consumer to only deliver messages if the stream has at minimum this specified last sequence.
118+
MinLastSeq uint64 `json:"min_last_seq,omitempty"`
117119

118120
// Don't add to general clients.
119121
Direct bool `json:"direct,omitempty"`
@@ -4647,6 +4649,20 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
46474649
}
46484650
}
46494651

4652+
// If a minimum last sequence was specified, we need to check if the
4653+
// underlying stream has sufficient data. As an optimization, we only
4654+
// do this if what we want to deliver is below this floor.
4655+
if o.cfg.MinLastSeq > 0 && pmsg.seq < o.cfg.MinLastSeq {
4656+
var state StreamState
4657+
o.mset.store.FastState(&state)
4658+
if state.LastSeq < o.cfg.MinLastSeq {
4659+
// We only block deliveries at the start until we reach min last seq,
4660+
// so simply put back our pointer to account for the o.getNextMsg advancing it.
4661+
o.sseq--
4662+
goto waitForMsgs
4663+
}
4664+
}
4665+
46504666
// Update our cached num pending here first.
46514667
if dc == 1 {
46524668
o.npc--

server/errors.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1778,5 +1778,15 @@
17781778
"help": "",
17791779
"url": "",
17801780
"deprecates": ""
1781+
},
1782+
{
1783+
"constant": "JSStreamMinLastSeqErr",
1784+
"code": 412,
1785+
"error_code": 10180,
1786+
"description": "min last sequence",
1787+
"comment": "",
1788+
"help": "",
1789+
"url": "",
1790+
"deprecates": ""
17811791
}
17821792
]

server/events.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ type pubMsg struct {
420420
sub string
421421
rply string
422422
si *ServerInfo
423-
hdr map[string]string
423+
hdr []byte
424424
msg any
425425
oct compressionType
426426
echo bool
@@ -429,7 +429,7 @@ type pubMsg struct {
429429

430430
var pubMsgPool sync.Pool
431431

432-
func newPubMsg(c *client, sub, rply string, si *ServerInfo, hdr map[string]string,
432+
func newPubMsg(c *client, sub, rply string, si *ServerInfo, hdr []byte,
433433
msg any, oct compressionType, echo, last bool) *pubMsg {
434434

435435
var m *pubMsg
@@ -604,17 +604,28 @@ RESET:
604604
// Add in NL
605605
b = append(b, _CRLF_...)
606606

607+
// Optional raw header addition.
608+
if pm.hdr != nil {
609+
b = append(pm.hdr, b...)
610+
nhdr := len(pm.hdr)
611+
nsize := len(b) - LEN_CR_LF
612+
// MQTT producers don't have CRLF, so add it back.
613+
if c.isMqtt() {
614+
nsize += LEN_CR_LF
615+
}
616+
// Update pubArgs
617+
// If others will use this later we need to save and restore original.
618+
c.pa.hdr = nhdr
619+
c.pa.size = nsize
620+
c.pa.hdb = []byte(strconv.Itoa(nhdr))
621+
c.pa.szb = []byte(strconv.Itoa(nsize))
622+
}
623+
607624
// Check if we should set content-encoding
608625
if contentHeader != _EMPTY_ {
609626
b = c.setHeader(contentEncodingHeader, contentHeader, b)
610627
}
611628

612-
// Optional header processing.
613-
if pm.hdr != nil {
614-
for k, v := range pm.hdr {
615-
b = c.setHeader(k, v, b)
616-
}
617-
}
618629
// Tracing
619630
if trace {
620631
c.traceInOp(fmt.Sprintf("PUB %s %s %d", c.pa.subject, c.pa.reply, c.pa.size), nil)
@@ -691,7 +702,7 @@ func (s *Server) sendInternalAccountMsg(a *Account, subject string, msg any) err
691702
}
692703

693704
// Used to send an internal message with an optional reply to an arbitrary account.
694-
func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply string, hdr map[string]string, msg any, echo bool) error {
705+
func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply string, hdr []byte, msg any, echo bool) error {
695706
s.mu.RLock()
696707
if s.sys == nil || s.sys.sendq == nil {
697708
s.mu.RUnlock()

server/jetstream_api.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ const (
129129
JSDirectMsgGet = "$JS.API.DIRECT.GET.*"
130130
JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
131131

132+
// JSDirectLeaderMsgGet is JSDirectLeaderMsgGetT but only answered by the current stream leader.
133+
JSDirectLeaderMsgGet = "$JS.API.DIRECT_LEADER.GET.*"
134+
JSDirectLeaderMsgGetT = "$JS.API.DIRECT_LEADER.GET.%s"
135+
132136
// This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
133137
// The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
134138
JSDirectGetLastBySubject = "$JS.API.DIRECT.GET.*.>"
@@ -675,7 +679,10 @@ type JSApiMsgGetRequest struct {
675679
LastFor string `json:"last_by_subj,omitempty"`
676680
NextFor string `json:"next_by_subj,omitempty"`
677681

678-
// Batch support. Used to request more then one msg at a time.
682+
// Force the server to only deliver messages if the stream has at minimum this specified last sequence.
683+
MinLastSeq uint64 `json:"min_last_seq,omitempty"`
684+
685+
// Batch support. Used to request more than one msg at a time.
679686
// Can be used with simple starting seq, but also NextFor with wildcards.
680687
Batch int `json:"batch,omitempty"`
681688
// This will make sure we limit how much data we blast out. If not set we will
@@ -1057,9 +1064,11 @@ type delayedAPIResponse struct {
10571064
subject string
10581065
reply string
10591066
request string
1067+
hdr []byte
10601068
response string
10611069
rg *raftGroup
10621070
deadline time.Time
1071+
noJs bool
10631072
next *delayedAPIResponse
10641073
}
10651074

@@ -1162,7 +1171,12 @@ func (s *Server) delayedAPIResponder() {
11621171
next()
11631172
case <-tm.C:
11641173
if r != nil {
1165-
s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
1174+
// If it's not a JS API error, send it as a raw response without additional API/audit tracking.
1175+
if r.noJs {
1176+
s.sendInternalAccountMsgWithReply(r.acc, r.subject, _EMPTY_, r.hdr, r.response, false)
1177+
} else {
1178+
s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
1179+
}
11661180
pop()
11671181
}
11681182
next()
@@ -1172,7 +1186,13 @@ func (s *Server) delayedAPIResponder() {
11721186

11731187
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
11741188
s.delayedAPIResponses.push(&delayedAPIResponse{
1175-
ci, acc, subject, reply, request, response, rg, time.Now().Add(duration), nil,
1189+
ci, acc, subject, reply, request, nil, response, rg, time.Now().Add(duration), false, nil,
1190+
})
1191+
}
1192+
1193+
func (s *Server) sendDelayedErrResponse(acc *Account, subject string, hdr []byte, response string, duration time.Duration) {
1194+
s.delayedAPIResponses.push(&delayedAPIResponse{
1195+
nil, acc, subject, _EMPTY_, _EMPTY_, hdr, response, nil, time.Now().Add(duration), true, nil,
11761196
})
11771197
}
11781198

@@ -3467,6 +3487,16 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
34673487
return
34683488
}
34693489

3490+
// Reject request if we can't guarantee the precondition of min last sequence.
3491+
if req.MinLastSeq > 0 && mset.lastSeq() < req.MinLastSeq {
3492+
// Even though only the leader is subscribed and will respond, we must delay the error.
3493+
// An old leader could think it's still leader, and it must not
3494+
// error sooner than the real leader can answer.
3495+
resp.Error = NewJSStreamMinLastSeqError()
3496+
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
3497+
return
3498+
}
3499+
34703500
var svp StoreMsg
34713501
var sm *StoreMsg
34723502

0 commit comments

Comments
 (0)