Skip to content

(2.12) Linearizable Message Get request #7146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,10 @@ type JSApiMsgGetRequest struct {
NextFor string `json:"next_by_subj,omitempty"`

// Force the server to only deliver messages if the stream has at minimum this specified last sequence.
// Can be used to guarantee read-after-write and monotonic reads.
MinLastSeq uint64 `json:"min_last_seq,omitempty"`
// Force the server to guarantee a linearizable read response.
Linearizable bool `json:"linearizable,omitempty"`

// Batch support. Used to request more than one msg at a time.
// Can be used with simple starting seq, but also NextFor with wildcards.
Expand Down Expand Up @@ -3461,7 +3464,8 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
(req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
(req.Seq > 0 && req.StartTime != nil) ||
(req.StartTime != nil && req.LastFor != _EMPTY_) ||
(req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
(req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) ||
(req.Linearizable && req.MinLastSeq > 0) {
resp.Error = NewJSBadRequestError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
Expand All @@ -3484,6 +3488,28 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
return
}

// A linearizable read can be answered immediately if R1, but needs to be replicated otherwise.
if req.Linearizable {
mset.mu.Lock()
if mset.isClustered() {
if mset.inflightLinearizableMsgGet == nil {
mset.inflightLinearizableMsgGet = make(map[string]linearizableMsgGet, 1)
}
mset.inflightLinearizableMsgGet[reply] = linearizableMsgGet{ci, acc, subject, copyBytes(msg), req}
if err := mset.node.Propose(encodeLinearizableMsgGet(reply)); err != nil {
mset.inflightLinearizableMsgGet = nil
}
mset.mu.Unlock()
return
}
mset.mu.Unlock()
}

s.jsMsgGetRespond(mset, ci, acc, subject, reply, msg, &req)
}

func (s *Server) jsMsgGetRespond(mset *stream, ci *ClientInfo, acc *Account, subject string, reply string, msg []byte, req *JSApiMsgGetRequest) {
var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
var svp StoreMsg
var sm *StoreMsg

Expand All @@ -3495,6 +3521,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
seq = req.Seq
}

var err error
if seq > 0 && req.NextFor == _EMPTY_ {
sm, err = mset.store.LoadMsg(seq, &svp)
} else if req.NextFor != _EMPTY_ {
Expand Down
40 changes: 40 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ const (
compressedStreamMsgOp
// For sending deleted gaps on catchups for replicas.
deleteRangeOp
// Linearizable read for message GET request.
linearizableMsgGetOp
)

// raftGroups are controlled by the metagroup controller.
Expand Down Expand Up @@ -3170,6 +3172,24 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
s.sendAPIResponse(sp.Client, mset.account(), sp.Subject, sp.Reply, _EMPTY_, s.jsonResponse(resp))
}
}
case linearizableMsgGetOp:
var le = binary.LittleEndian
buf = buf[1:]
if len(buf) < 2 {
continue
}
rl := int(le.Uint16(buf))
buf = buf[2:]
if len(buf) < rl {
continue
}
reply := string(buf[:rl])
mset.mu.Lock()
if msgGet, ok := mset.inflightLinearizableMsgGet[reply]; ok {
mset.srv.jsMsgGetRespond(mset, msgGet.ci, msgGet.acc, msgGet.subject, reply, msgGet.msg, &msgGet.req)
}
delete(mset.inflightLinearizableMsgGet, reply)
mset.mu.Unlock()
default:
panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type: %v", op))
}
Expand Down Expand Up @@ -3309,6 +3329,10 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
mset.expectedPerSubjectInProcess = nil
mset.clMu.Unlock()

mset.mu.Lock()
mset.inflightLinearizableMsgGet = nil
mset.mu.Unlock()

js.mu.Lock()
s, account, err := js.srv, sa.Client.serviceAccount(), sa.err
client, subject, reply := sa.Client, sa.Subject, sa.Reply
Expand Down Expand Up @@ -7949,6 +7973,22 @@ func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq u
return buf
}

func encodeLinearizableMsgGet(reply string) []byte {
// Clip the reply down. Operate on uint64 lengths to avoid overflowing.
rlen := min(uint64(len(reply)), math.MaxUint16)
total := rlen

elen := int(1 + 2 + total)

buf := make([]byte, 1, elen)
buf[0] = byte(linearizableMsgGetOp)

var le = binary.LittleEndian
buf = le.AppendUint16(buf, uint16(rlen))
buf = append(buf, reply[:rlen]...)
return buf
}

// Determine if all peers in our set support the binary snapshot.
func (mset *stream) supportsBinarySnapshot() bool {
mset.mu.RLock()
Expand Down
203 changes: 203 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9254,6 +9254,209 @@ func TestJetStreamClusterMsgGetMonotonicRead(t *testing.T) {
require_Equal(t, resp.Message.Sequence, 1)
}

func TestJetStreamClusterMsgGetLinearizable(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// Can't use linearizable and min last sequence together.
data, err := json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true, MinLastSeq: 1})
require_NoError(t, err)
msg, err := nc.Request(fmt.Sprintf(JSApiMsgGetT, "TEST"), data, time.Second)
require_NoError(t, err)

var resp JSApiMsgGetResponse
require_NoError(t, json.Unmarshal(msg.Data, &resp))
require_True(t, resp.ApiResponse.Error != nil)
require_Error(t, resp.ApiResponse.Error, NewJSBadRequestError())

// Linearizable read returns no message.
data, err = json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true})
require_NoError(t, err)
msg, err = nc.Request(fmt.Sprintf(JSApiMsgGetT, "TEST"), data, time.Second)
require_NoError(t, err)

resp = JSApiMsgGetResponse{}
require_NoError(t, json.Unmarshal(msg.Data, &resp))
require_True(t, resp.ApiResponse.Error != nil)
require_Error(t, resp.ApiResponse.Error, NewJSNoMessageFoundError())

pubAck, err := js.Publish("foo", nil)
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, 1)

// Linearizable read returns the published message.
data, err = json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true})
require_NoError(t, err)
msg, err = nc.Request(fmt.Sprintf(JSApiMsgGetT, "TEST"), data, time.Second)
require_NoError(t, err)

resp = JSApiMsgGetResponse{}
require_NoError(t, json.Unmarshal(msg.Data, &resp))
require_True(t, resp.ApiResponse.Error == nil)
require_Equal(t, resp.Message.Sequence, 1)
}

func TestJetStreamClusterDirectGetLinearizableRejected(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
AllowDirect: true,
})
require_NoError(t, err)
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})

data, err := json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true})
require_NoError(t, err)
msg, err := nc.Request(fmt.Sprintf(JSDirectMsgGetT, "TEST"), data, time.Second)
require_NoError(t, err)
require_Equal(t, msg.Header.Get("Status"), "408")
require_Equal(t, msg.Header.Get("Description"), "Linearizable Not Supported")
}

func TestJetStreamClusterDirectGetLastBySubjectLinearizableRejected(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
AllowDirect: true,
})
require_NoError(t, err)
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})

data, err := json.Marshal(JSApiMsgGetRequest{Linearizable: true})
require_NoError(t, err)
msg, err := nc.Request(fmt.Sprintf(JSDirectGetLastBySubjectT, "TEST", "foo"), data, time.Second)
require_NoError(t, err)
require_Equal(t, msg.Header.Get("Status"), "408")
require_Equal(t, msg.Header.Get("Description"), "Linearizable Not Supported")
}

func TestJetStreamClusterMirrorDirectGetLinearizableRejected(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "S",
Subjects: []string{"foo"},
AllowDirect: true,
Replicas: 3,
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "S")
})

_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
AllowDirect: true,
MirrorDirect: true,
Mirror: &nats.StreamSource{Name: "S"},
Replicas: 3,
})
require_NoError(t, err)

mirrorLeader := c.streamLeader(globalAccountName, "M")
checkMirrorConsistent := func() {
t.Helper()
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
if err := checkState(t, c, globalAccountName, "M"); err != nil {
return err
}
mset, err := mirrorLeader.globalAccount().lookupStream("M")
if err != nil {
return err
}
if mset.lastSeq() != 1 {
return errors.New("waiting for mirror to catch up")
}
return nil
})
}
checkMirrorConsistent()

// On source, stop responding to direct gets.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "S",
Subjects: []string{"foo"},
AllowDirect: false,
Replicas: 3,
})
require_NoError(t, err)

// On mirror, cancel mirroring, truncate store, and subscribe to mirror direct get.
for _, s := range c.servers {
mset, err := s.globalAccount().lookupStream("M")
require_NoError(t, err)
mset.mu.Lock()
mset.cancelMirrorConsumer()
mset.lseq = 0
if err = mset.store.Truncate(0); err != nil {
mset.mu.Unlock()
require_NoError(t, err)
}
// Only let mirror followers subscribe to mirror direct.
if s != mirrorLeader {
if err = mset.subscribeToMirrorDirect(); err != nil {
mset.mu.Unlock()
require_NoError(t, err)
}
} else {
mset.unsubscribeToMirrorDirect()
}
mset.mu.Unlock()
}

// Need to wait for subscriptions to be fully propagated.
time.Sleep(200 * time.Millisecond)

data, err := json.Marshal(JSApiMsgGetRequest{Seq: 1, Linearizable: true})
require_NoError(t, err)
msg, err := nc.Request(fmt.Sprintf(JSDirectMsgGetT, "S"), data, time.Second)
require_NoError(t, err)
require_Equal(t, msg.Header.Get("Status"), "408")
require_Equal(t, msg.Header.Get("Description"), "Linearizable Not Supported")

data, err = json.Marshal(JSApiMsgGetRequest{Linearizable: true})
require_NoError(t, err)
msg, err = nc.Request(fmt.Sprintf(JSDirectGetLastBySubjectT, "S", "foo"), data, time.Second)
require_NoError(t, err)
require_Equal(t, msg.Header.Get("Status"), "408")
require_Equal(t, msg.Header.Get("Description"), "Linearizable Not Supported")
}

func TestJetStreamClusterDirectGetReadAfterWrite(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
22 changes: 21 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ type stream struct {
expectedPerSubjectSequence map[uint64]string // Inflight 'expected per subject' subjects per clseq.
expectedPerSubjectInProcess map[string]struct{} // Current 'expected per subject' subjects in process.

inflightLinearizableMsgGet map[string]linearizableMsgGet // Inflight message get requests that require linearizability.

// Direct get subscription.
directLeaderSub *subscription
directSub *subscription
Expand All @@ -422,6 +424,14 @@ type stream struct {
batches *batching // Inflight batches prior to committing them.
}

type linearizableMsgGet struct {
ci *ClientInfo
acc *Account
subject string
msg []byte
req JSApiMsgGetRequest
}

// msgCounterRunningTotal stores a running total and a number of inflight
// but not yet applied clustered proposals/operations for this counter.
type msgCounterRunningTotal struct {
Expand Down Expand Up @@ -4688,6 +4698,12 @@ func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Accou
return
}

if req.Linearizable {
hdr := []byte("NATS/1.0 408 Linearizable Not Supported\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

// Reject request if we can't guarantee the precondition of min last sequence.
if req.MinLastSeq > 0 && mset.lastSeq() < req.MinLastSeq {
// We are not up-to-date yet, and don't know how long it will take us to be.
Expand Down Expand Up @@ -4734,7 +4750,11 @@ func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *cli
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

if req.Linearizable {
hdr := []byte("NATS/1.0 408 Linearizable Not Supported\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
if req.MinLastSeq == 0 {
hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
Expand Down