Skip to content

(2.11) Adds replies for AckAck requests #6198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2271,10 +2271,27 @@ func configsEqualSansDelivery(a, b ConsumerConfig) bool {
}

// Helper to send a reply to an ack.
func (o *consumer) sendAckReply(subj string) {
func (o *consumer) sendAckReply(subj string, err *ApiError) {
o.mu.RLock()
defer o.mu.RUnlock()
o.outq.sendMsg(subj, nil)
o.sendAckReplyLocked(subj, err)
}

// Helper to send a reply to an ack.
// Lock must be held
func (o *consumer) sendAckReplyLocked(subj string, err *ApiError) {

var resp = JSApiConsumerAckResponse{ApiResponse: ApiResponse{Type: JSApiConsumerAckResponseType, Error: err}}

if err == nil {
resp.Success = true
}

j, e := json.Marshal(resp)
if e != nil {
return
}
o.outq.sendMsg(subj, j)
}

type jsAckMsg struct {
Expand Down Expand Up @@ -2327,17 +2344,20 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
}

sseq, dseq, dc := ackReplyInfo(subject)
var err *ApiError

skipAckReply := sseq == 0

switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
if !o.processAckMsg(sseq, dseq, dc, reply, true) {
if b, e := o.processAckMsg(sseq, dseq, dc, reply, true); e == nil && !b {
// We handle replies for acks in updateAcks
skipAckReply = true
} else {
err = e
}
case bytes.HasPrefix(msg, AckNext):
o.processAckMsg(sseq, dseq, dc, _EMPTY_, true)
_, _ = o.processAckMsg(sseq, dseq, dc, _EMPTY_, true)
o.processNextMsgRequest(reply, msg[len(AckNext):])
skipAckReply = true
case bytes.HasPrefix(msg, AckNak):
Expand All @@ -2357,7 +2377,7 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {

// Ack the ack if requested.
if len(reply) > 0 && !skipAckReply {
o.sendAckReply(reply)
o.sendAckReply(reply, err)
}
}

Expand Down Expand Up @@ -2493,7 +2513,7 @@ func (o *consumer) addAckReply(sseq uint64, reply string) {
}

// Lock should be held.
func (o *consumer) updateAcks(dseq, sseq uint64, reply string) {
func (o *consumer) updateAcks(dseq, sseq uint64, reply string, err *ApiError) {
if o.node != nil {
// Inline for now, use variable compression.
var b [2*binary.MaxVarintLen64 + 1]byte
Expand All @@ -2509,7 +2529,7 @@ func (o *consumer) updateAcks(dseq, sseq uint64, reply string) {
o.store.UpdateAcks(dseq, sseq)
if reply != _EMPTY_ {
// Already locked so send direct.
o.outq.sendMsg(reply, nil)
o.sendAckReplyLocked(reply, err)
}
}
// Update activity.
Expand Down Expand Up @@ -2705,7 +2725,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
// case the reply will be sent later).
func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool {
// Treat like an ack to suppress redelivery.
ackedInPlace := o.processAckMsg(sseq, dseq, dc, reply, false)
ackedInPlace, _ := o.processAckMsg(sseq, dseq, dc, reply, false)

o.mu.Lock()
defer o.mu.Unlock()
Expand Down Expand Up @@ -3061,17 +3081,17 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
// Returns `true` if the ack was processed in place and the sender can now respond
// to the client, or `false` if there was an error or the ack is replicated (in which
// case the reply will be sent later).
func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) bool {
func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) (bool, *ApiError) {
o.mu.Lock()
if o.closed {
o.mu.Unlock()
return false
return false, nil
}

mset := o.mset
if mset == nil || mset.closed.Load() {
o.mu.Unlock()
return false
return false, nil
}

// Check if this ack is above the current pointer to our next to deliver.
Expand All @@ -3083,11 +3103,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
var ss StreamState
mset.store.FastState(&ss)
if sseq > ss.LastSeq {
o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d",
o.acc.Name, o.stream, o.name, sseq, ss.LastSeq)
// FIXME(dlc) - For 2.11 onwards should we return an error here to the caller?
o.mu.Unlock()
return false
return false, NewJSConsumerMsgNotPendingAckError()
}
o.sseq = sseq + 1
}
Expand All @@ -3099,6 +3116,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b

var sgap, floor uint64
var needSignal bool
var err *ApiError

switch o.cfg.AckPolicy {
case AckExplicit:
Expand Down Expand Up @@ -3128,14 +3146,17 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}
}
}
} else {
err = NewJSConsumerMsgNotPendingAckError()
}

delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
case AckAll:
// no-op
if dseq <= o.adflr || sseq <= o.asflr {
o.mu.Unlock()
return ackInPlace
return ackInPlace, nil
}
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
Expand Down Expand Up @@ -3167,7 +3188,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
case AckNone:
// FIXME(dlc) - This is error but do we care?
o.mu.Unlock()
return ackInPlace
return ackInPlace, nil
}

// No ack replication, so we set reply to "" so that updateAcks does not
Expand All @@ -3176,7 +3197,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
reply = _EMPTY_
}
// Update underlying store.
o.updateAcks(dseq, sseq, reply)
o.updateAcks(dseq, sseq, reply, err)
o.mu.Unlock()

if ackInPlace {
Expand All @@ -3194,7 +3215,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
if needSignal {
o.signalNewMessages()
}
return ackInPlace
return ackInPlace, err
}

// Determine if this is a truly filtered consumer. Modern clients will place filtered subjects
Expand Down Expand Up @@ -4834,7 +4855,7 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
if mset != nil && mset.ackq != nil && (o.node == nil || o.cfg.Direct) {
mset.ackq.push(seq)
} else {
o.updateAcks(dseq, seq, _EMPTY_)
o.updateAcks(dseq, seq, _EMPTY_, nil)
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1628,5 +1628,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerMsgNotPendingAckErr",
"code": 400,
"error_code": 10165,
"description": "This message is currently not pending an ack (already acked or not yet delivered)",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
7 changes: 7 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,13 @@ type JSApiConsumerPauseRequest struct {
PauseUntil time.Time `json:"pause_until,omitempty"`
}

const JSApiConsumerAckResponseType = "io.nats.jetstream.api.v1.consumer_ack_response"

type JSApiConsumerAckResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}

const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"

type JSApiConsumerPauseResponse struct {
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5275,7 +5275,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {

// Check if we have a reply that was requested.
if reply := o.replies[sseq]; reply != _EMPTY_ {
o.outq.sendMsg(reply, nil)
o.sendAckReplyLocked(reply, nil)
delete(o.replies, sseq)
}

Expand Down
34 changes: 28 additions & 6 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6668,9 +6668,11 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) {
}
}

// Make sure if we received acks that are out of bounds, meaning past our
// Test that acking returns the expected reply message.
// Make sure that an error is returned when trying to ack the same message more than once.
// Make sure if we receive acks that are out of bounds, meaning past our
// last sequence or before our first that they are ignored and errored if applicable.
func TestJetStreamClusterConsumerAckOutOfBounds(t *testing.T) {
func TestJetStreamConsumerClusterAckAck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand All @@ -6696,13 +6698,33 @@ func TestJetStreamClusterConsumerAckOutOfBounds(t *testing.T) {
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)
msgs[0].AckSync()
ackSubject := msgs[0].Reply

var resp JSApiConsumerAckResponse

// first ack
replyMsg, err := nc.Request(ackSubject, nil, 5000*time.Millisecond)
require_NoError(t, err)
require_NoError(t, json.Unmarshal(replyMsg.Data, &resp))
require_True(t, resp.Success)

// check for error if trying to ack again
resp = JSApiConsumerAckResponse{}
replyMsg, err = nc.Request(ackSubject, nil, 250*time.Millisecond)
require_NoError(t, err)
require_NoError(t, json.Unmarshal(replyMsg.Data, &resp))
require_False(t, resp.Success)
require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr))

// Now ack way past the last sequence.
_, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond)
require_Error(t, err, nats.ErrTimeout)
resp = JSApiConsumerAckResponse{}
replyMsg, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond)
require_NoError(t, err)
require_NoError(t, json.Unmarshal(replyMsg.Data, &resp))
require_False(t, resp.Success)
require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr))

// Make sure that now changes happened to our state.
// Make sure that no changes happened to our state.
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ const (
// JSConsumerMetadataLengthErrF consumer metadata exceeds maximum size of {limit}
JSConsumerMetadataLengthErrF ErrorIdentifier = 10135

// JSConsumerMsgNotPendingAckErr This message is currently not pending an ack (already acked or not yet delivered)
JSConsumerMsgNotPendingAckErr ErrorIdentifier = 10165

// JSConsumerMultipleFiltersNotAllowed consumer with multiple subject filters cannot use subject based API
JSConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137

Expand Down Expand Up @@ -548,6 +551,7 @@ var (
JSConsumerMaxRequestExpiresToSmall: {Code: 400, ErrCode: 10115, Description: "consumer max request expires needs to be >= 1ms"},
JSConsumerMaxWaitingNegativeErr: {Code: 400, ErrCode: 10087, Description: "consumer max waiting needs to be positive"},
JSConsumerMetadataLengthErrF: {Code: 400, ErrCode: 10135, Description: "consumer metadata exceeds maximum size of {limit}"},
JSConsumerMsgNotPendingAckErr: {Code: 400, ErrCode: 10165, Description: "This message is currently not pending an ack (already acked or not yet delivered)"},
JSConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10137, Description: "consumer with multiple subject filters cannot use subject based API"},
JSConsumerNameContainsPathSeparatorsErr: {Code: 400, ErrCode: 10127, Description: "Consumer name can not contain path separators"},
JSConsumerNameExistErr: {Code: 400, ErrCode: 10013, Description: "consumer name already in use"},
Expand Down Expand Up @@ -1249,6 +1253,16 @@ func NewJSConsumerMetadataLengthError(limit interface{}, opts ...ErrorOption) *A
}
}

// NewJSConsumerMsgNotPendingAckError creates a new JSConsumerMsgNotPendingAckErr error: "This message is currently not pending an ack (already acked or not yet delivered)"
func NewJSConsumerMsgNotPendingAckError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSConsumerMsgNotPendingAckErr]
}

// NewJSConsumerMultipleFiltersNotAllowedError creates a new JSConsumerMultipleFiltersNotAllowed error: "consumer with multiple subject filters cannot use subject based API"
func NewJSConsumerMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
32 changes: 30 additions & 2 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2995,8 +2995,8 @@ func TestJetStreamConsumerAckAck(t *testing.T) {
nc := clientConnectToServer(t, s)
defer nc.Close()

// 4 for number of ack protocols to test them all.
for i := 0; i < 4; i++ {
// 5 for number of ack protocols plus already acked to test them all.
for i := 0; i < 5; i++ {
sendStreamMsg(t, nc, mname, "Hello World!")
}

Expand All @@ -3015,6 +3015,34 @@ func TestJetStreamConsumerAckAck(t *testing.T) {
testAck(AckNak)
testAck(AckProgress)
testAck(AckTerm)

// checking replies and errors on two explicit acks for the same message or out of range ack
// first get the message
m, err := nc.Request(rqn, nil, 10*time.Millisecond)
require_NoError(t, err)

var resp JSApiConsumerAckResponse
// Send a request for the first ack and make sure it worked.
ackReply1, err := nc.Request(m.Reply, AckAck, 10*time.Millisecond)
require_NoError(t, err)
require_NoError(t, json.Unmarshal(ackReply1.Data, &resp))
require_True(t, resp.Success)

// Send a second ack and make sure it fails.
ackReply2, err := nc.Request(m.Reply, AckAck, 10*time.Millisecond)
require_NoError(t, err)
resp = JSApiConsumerAckResponse{}
require_NoError(t, json.Unmarshal(ackReply2.Data, &resp))
require_False(t, resp.Success)
require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr))

// Error trying to ack an out of range sequence number
ackReply3, err := nc.Request("$JS.ACK.ACK-ACK.worker.1.6.0.0.0", AckAck, 10*time.Millisecond)
require_NoError(t, err)
resp = JSApiConsumerAckResponse{}
require_NoError(t, json.Unmarshal(ackReply3.Data, &resp))
require_False(t, resp.Success)
require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr))
}

func TestJetStreamAckNext(t *testing.T) {
Expand Down
Loading