diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 69da6a49ba..9d5c3ba555 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -680,7 +680,10 @@ type JSApiMsgGetRequest struct { NextFor string `json:"next_by_subj,omitempty"` // Force the server to only deliver messages if the stream has at minimum this specified last sequence. + // Can be used to guarantee read-after-write and monotonic reads. MinLastSeq uint64 `json:"min_last_seq,omitempty"` + // Force the server to guarantee a linearizable read response. + Linearizable bool `json:"linearizable,omitempty"` // Batch support. Used to request more than one msg at a time. // Can be used with simple starting seq, but also NextFor with wildcards. @@ -3461,7 +3464,8 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) || (req.Seq > 0 && req.StartTime != nil) || (req.StartTime != nil && req.LastFor != _EMPTY_) || - (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) { + (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) || + (req.Linearizable && req.MinLastSeq > 0) { resp.Error = NewJSBadRequestError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -3484,6 +3488,28 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje return } + // A linearizable read can be answered immediately if R1, but needs to be replicated otherwise. + if req.Linearizable { + mset.mu.Lock() + if mset.isClustered() { + if mset.inflightLinearizableMsgGet == nil { + mset.inflightLinearizableMsgGet = make(map[string]linearizableMsgGet, 1) + } + mset.inflightLinearizableMsgGet[reply] = linearizableMsgGet{ci, acc, subject, copyBytes(msg), req} + if err := mset.node.Propose(encodeLinearizableMsgGet(reply)); err != nil { + mset.inflightLinearizableMsgGet = nil + } + mset.mu.Unlock() + return + } + mset.mu.Unlock() + } + + s.jsMsgGetRespond(mset, ci, acc, subject, reply, msg, &req) +} + +func (s *Server) jsMsgGetRespond(mset *stream, ci *ClientInfo, acc *Account, subject string, reply string, msg []byte, req *JSApiMsgGetRequest) { + var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}} var svp StoreMsg var sm *StoreMsg @@ -3495,6 +3521,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje seq = req.Seq } + var err error if seq > 0 && req.NextFor == _EMPTY_ { sm, err = mset.store.LoadMsg(seq, &svp) } else if req.NextFor != _EMPTY_ { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5cac62be18..251b5757b8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -114,6 +114,8 @@ const ( compressedStreamMsgOp // For sending deleted gaps on catchups for replicas. deleteRangeOp + // Linearizable read for message GET request. + linearizableMsgGetOp ) // raftGroups are controlled by the metagroup controller. @@ -3170,6 +3172,24 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco s.sendAPIResponse(sp.Client, mset.account(), sp.Subject, sp.Reply, _EMPTY_, s.jsonResponse(resp)) } } + case linearizableMsgGetOp: + var le = binary.LittleEndian + buf = buf[1:] + if len(buf) < 2 { + continue + } + rl := int(le.Uint16(buf)) + buf = buf[2:] + if len(buf) < rl { + continue + } + reply := string(buf[:rl]) + mset.mu.Lock() + if msgGet, ok := mset.inflightLinearizableMsgGet[reply]; ok { + mset.srv.jsMsgGetRespond(mset, msgGet.ci, msgGet.acc, msgGet.subject, reply, msgGet.msg, &msgGet.req) + } + delete(mset.inflightLinearizableMsgGet, reply) + mset.mu.Unlock() default: panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type: %v", op)) } @@ -3309,6 +3329,10 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { mset.expectedPerSubjectInProcess = nil mset.clMu.Unlock() + mset.mu.Lock() + mset.inflightLinearizableMsgGet = nil + mset.mu.Unlock() + js.mu.Lock() s, account, err := js.srv, sa.Client.serviceAccount(), sa.err client, subject, reply := sa.Client, sa.Subject, sa.Reply @@ -7949,6 +7973,22 @@ func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq u return buf } +func encodeLinearizableMsgGet(reply string) []byte { + // Clip the reply down. Operate on uint64 lengths to avoid overflowing. + rlen := min(uint64(len(reply)), math.MaxUint16) + total := rlen + + elen := int(1 + 2 + total) + + buf := make([]byte, 1, elen) + buf[0] = byte(linearizableMsgGetOp) + + var le = binary.LittleEndian + buf = le.AppendUint16(buf, uint16(rlen)) + buf = append(buf, reply[:rlen]...) + return buf +} + // Determine if all peers in our set support the binary snapshot. func (mset *stream) supportsBinarySnapshot() bool { mset.mu.RLock() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index b807c1521a..eef2751cf5 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9254,6 +9254,209 @@ func TestJetStreamClusterMsgGetMonotonicRead(t *testing.T) { require_Equal(t, resp.Message.Sequence, 1) } +func TestJetStreamClusterMsgGetLinearizable(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Can't use linearizable and min last sequence together. + data, err := json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true, MinLastSeq: 1}) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSApiMsgGetT, "TEST"), data, time.Second) + require_NoError(t, err) + + var resp JSApiMsgGetResponse + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_True(t, resp.ApiResponse.Error != nil) + require_Error(t, resp.ApiResponse.Error, NewJSBadRequestError()) + + // Linearizable read returns no message. + data, err = json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true}) + require_NoError(t, err) + msg, err = nc.Request(fmt.Sprintf(JSApiMsgGetT, "TEST"), data, time.Second) + require_NoError(t, err) + + resp = JSApiMsgGetResponse{} + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_True(t, resp.ApiResponse.Error != nil) + require_Error(t, resp.ApiResponse.Error, NewJSNoMessageFoundError()) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + // Linearizable read returns the published message. + data, err = json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true}) + require_NoError(t, err) + msg, err = nc.Request(fmt.Sprintf(JSApiMsgGetT, "TEST"), data, time.Second) + require_NoError(t, err) + + resp = JSApiMsgGetResponse{} + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_True(t, resp.ApiResponse.Error == nil) + require_Equal(t, resp.Message.Sequence, 1) +} + +func TestJetStreamClusterDirectGetLinearizableRejected(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + AllowDirect: true, + }) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + data, err := json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true}) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSDirectMsgGetT, "TEST"), data, time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "408") + require_Equal(t, msg.Header.Get("Description"), "Linearizable Not Supported") +} + +func TestJetStreamClusterDirectGetLastBySubjectLinearizableRejected(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + AllowDirect: true, + }) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + data, err := json.Marshal(JSApiMsgGetRequest{Linearizable: true}) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSDirectGetLastBySubjectT, "TEST", "foo"), data, time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "408") + require_Equal(t, msg.Header.Get("Description"), "Linearizable Not Supported") +} + +func TestJetStreamClusterMirrorDirectGetLinearizableRejected(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "S", + Subjects: []string{"foo"}, + AllowDirect: true, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "S") + }) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + AllowDirect: true, + MirrorDirect: true, + Mirror: &nats.StreamSource{Name: "S"}, + Replicas: 3, + }) + require_NoError(t, err) + + mirrorLeader := c.streamLeader(globalAccountName, "M") + checkMirrorConsistent := func() { + t.Helper() + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + if err := checkState(t, c, globalAccountName, "M"); err != nil { + return err + } + mset, err := mirrorLeader.globalAccount().lookupStream("M") + if err != nil { + return err + } + if mset.lastSeq() != 1 { + return errors.New("waiting for mirror to catch up") + } + return nil + }) + } + checkMirrorConsistent() + + // On source, stop responding to direct gets. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "S", + Subjects: []string{"foo"}, + AllowDirect: false, + Replicas: 3, + }) + require_NoError(t, err) + + // On mirror, cancel mirroring, truncate store, and subscribe to mirror direct get. + for _, s := range c.servers { + mset, err := s.globalAccount().lookupStream("M") + require_NoError(t, err) + mset.mu.Lock() + mset.cancelMirrorConsumer() + mset.lseq = 0 + if err = mset.store.Truncate(0); err != nil { + mset.mu.Unlock() + require_NoError(t, err) + } + // Only let mirror followers subscribe to mirror direct. + if s != mirrorLeader { + if err = mset.subscribeToMirrorDirect(); err != nil { + mset.mu.Unlock() + require_NoError(t, err) + } + } else { + mset.unsubscribeToMirrorDirect() + } + mset.mu.Unlock() + } + + // Need to wait for subscriptions to be fully propagated. + time.Sleep(200 * time.Millisecond) + + data, err := json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true}) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSDirectMsgGetT, "S"), data, time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "408") + require_Equal(t, msg.Header.Get("Description"), "Linearizable Not Supported") + + data, err = json.Marshal(JSApiMsgGetRequest{Linearizable: true}) + require_NoError(t, err) + msg, err = nc.Request(fmt.Sprintf(JSDirectGetLastBySubjectT, "S", "foo"), data, time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "408") + require_Equal(t, msg.Header.Get("Description"), "Linearizable Not Supported") +} + func TestJetStreamClusterDirectGetReadAfterWrite(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/stream.go b/server/stream.go index f2e9dc50df..1f3fd928b7 100644 --- a/server/stream.go +++ b/server/stream.go @@ -410,6 +410,8 @@ type stream struct { expectedPerSubjectSequence map[uint64]string // Inflight 'expected per subject' subjects per clseq. expectedPerSubjectInProcess map[string]struct{} // Current 'expected per subject' subjects in process. + inflightLinearizableMsgGet map[string]linearizableMsgGet // Inflight message get requests that require linearizability. + // Direct get subscription. directLeaderSub *subscription directSub *subscription @@ -422,6 +424,14 @@ type stream struct { batches *batching // Inflight batches prior to committing them. } +type linearizableMsgGet struct { + ci *ClientInfo + acc *Account + subject string + msg []byte + req JSApiMsgGetRequest +} + // msgCounterRunningTotal stores a running total and a number of inflight // but not yet applied clustered proposals/operations for this counter. type msgCounterRunningTotal struct { @@ -4688,6 +4698,12 @@ func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Accou return } + if req.Linearizable { + hdr := []byte("NATS/1.0 408 Linearizable Not Supported\r\n\r\n") + mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + return + } + // Reject request if we can't guarantee the precondition of min last sequence. if req.MinLastSeq > 0 && mset.lastSeq() < req.MinLastSeq { // We are not up-to-date yet, and don't know how long it will take us to be. @@ -4734,7 +4750,11 @@ func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *cli mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) return } - + if req.Linearizable { + hdr := []byte("NATS/1.0 408 Linearizable Not Supported\r\n\r\n") + mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + return + } if req.MinLastSeq == 0 { hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n") mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))