diff --git a/server/consumer.go b/server/consumer.go index e4118c72cf5..bb248f7c430 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2271,10 +2271,27 @@ func configsEqualSansDelivery(a, b ConsumerConfig) bool { } // Helper to send a reply to an ack. -func (o *consumer) sendAckReply(subj string) { +func (o *consumer) sendAckReply(subj string, err *ApiError) { o.mu.RLock() defer o.mu.RUnlock() - o.outq.sendMsg(subj, nil) + o.sendAckReplyLocked(subj, err) +} + +// Helper to send a reply to an ack. +// Lock must be held +func (o *consumer) sendAckReplyLocked(subj string, err *ApiError) { + + var resp = JSApiConsumerAckResponse{ApiResponse: ApiResponse{Type: JSApiConsumerAckResponseType, Error: err}} + + if err == nil { + resp.Success = true + } + + j, e := json.Marshal(resp) + if e != nil { + return + } + o.outq.sendMsg(subj, j) } type jsAckMsg struct { @@ -2327,17 +2344,20 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { } sseq, dseq, dc := ackReplyInfo(subject) + var err *ApiError skipAckReply := sseq == 0 switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): - if !o.processAckMsg(sseq, dseq, dc, reply, true) { + if b, e := o.processAckMsg(sseq, dseq, dc, reply, true); e == nil && !b { // We handle replies for acks in updateAcks skipAckReply = true + } else { + err = e } case bytes.HasPrefix(msg, AckNext): - o.processAckMsg(sseq, dseq, dc, _EMPTY_, true) + _, _ = o.processAckMsg(sseq, dseq, dc, _EMPTY_, true) o.processNextMsgRequest(reply, msg[len(AckNext):]) skipAckReply = true case bytes.HasPrefix(msg, AckNak): @@ -2357,7 +2377,7 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { // Ack the ack if requested. if len(reply) > 0 && !skipAckReply { - o.sendAckReply(reply) + o.sendAckReply(reply, err) } } @@ -2493,7 +2513,7 @@ func (o *consumer) addAckReply(sseq uint64, reply string) { } // Lock should be held. -func (o *consumer) updateAcks(dseq, sseq uint64, reply string) { +func (o *consumer) updateAcks(dseq, sseq uint64, reply string, err *ApiError) { if o.node != nil { // Inline for now, use variable compression. var b [2*binary.MaxVarintLen64 + 1]byte @@ -2509,7 +2529,7 @@ func (o *consumer) updateAcks(dseq, sseq uint64, reply string) { o.store.UpdateAcks(dseq, sseq) if reply != _EMPTY_ { // Already locked so send direct. - o.outq.sendMsg(reply, nil) + o.sendAckReplyLocked(reply, err) } } // Update activity. @@ -2705,7 +2725,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { // case the reply will be sent later). func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool { // Treat like an ack to suppress redelivery. - ackedInPlace := o.processAckMsg(sseq, dseq, dc, reply, false) + ackedInPlace, _ := o.processAckMsg(sseq, dseq, dc, reply, false) o.mu.Lock() defer o.mu.Unlock() @@ -3061,17 +3081,17 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { // Returns `true` if the ack was processed in place and the sender can now respond // to the client, or `false` if there was an error or the ack is replicated (in which // case the reply will be sent later). -func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) bool { +func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) (bool, *ApiError) { o.mu.Lock() if o.closed { o.mu.Unlock() - return false + return false, nil } mset := o.mset if mset == nil || mset.closed.Load() { o.mu.Unlock() - return false + return false, nil } // Check if this ack is above the current pointer to our next to deliver. @@ -3083,11 +3103,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b var ss StreamState mset.store.FastState(&ss) if sseq > ss.LastSeq { - o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d", - o.acc.Name, o.stream, o.name, sseq, ss.LastSeq) - // FIXME(dlc) - For 2.11 onwards should we return an error here to the caller? o.mu.Unlock() - return false + return false, NewJSConsumerMsgNotPendingAckError() } o.sseq = sseq + 1 } @@ -3099,6 +3116,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b var sgap, floor uint64 var needSignal bool + var err *ApiError switch o.cfg.AckPolicy { case AckExplicit: @@ -3128,14 +3146,17 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b } } } + } else { + err = NewJSConsumerMsgNotPendingAckError() } + delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) case AckAll: // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() - return ackInPlace + return ackInPlace, nil } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true @@ -3167,7 +3188,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b case AckNone: // FIXME(dlc) - This is error but do we care? o.mu.Unlock() - return ackInPlace + return ackInPlace, nil } // No ack replication, so we set reply to "" so that updateAcks does not @@ -3176,7 +3197,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b reply = _EMPTY_ } // Update underlying store. - o.updateAcks(dseq, sseq, reply) + o.updateAcks(dseq, sseq, reply, err) o.mu.Unlock() if ackInPlace { @@ -3194,7 +3215,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b if needSignal { o.signalNewMessages() } - return ackInPlace + return ackInPlace, err } // Determine if this is a truly filtered consumer. Modern clients will place filtered subjects @@ -4834,7 +4855,7 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, if mset != nil && mset.ackq != nil && (o.node == nil || o.cfg.Direct) { mset.ackq.push(seq) } else { - o.updateAcks(dseq, seq, _EMPTY_) + o.updateAcks(dseq, seq, _EMPTY_, nil) } } } diff --git a/server/errors.json b/server/errors.json index 9bb8cb18c66..92ab970d4e2 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1628,5 +1628,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerMsgNotPendingAckErr", + "code": 400, + "error_code": 10165, + "description": "This message is currently not pending an ack (already acked or not yet delivered)", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 45a6ddc01df..c3011ab7037 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -718,6 +718,13 @@ type JSApiConsumerPauseRequest struct { PauseUntil time.Time `json:"pause_until,omitempty"` } +const JSApiConsumerAckResponseType = "io.nats.jetstream.api.v1.consumer_ack_response" + +type JSApiConsumerAckResponse struct { + ApiResponse + Success bool `json:"success,omitempty"` +} + const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response" type JSApiConsumerPauseResponse struct { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 93f681c323d..e57d2244ac6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5275,7 +5275,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error { // Check if we have a reply that was requested. if reply := o.replies[sseq]; reply != _EMPTY_ { - o.outq.sendMsg(reply, nil) + o.sendAckReplyLocked(reply, nil) delete(o.replies, sseq) } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 06adf875c5d..30c4fabc00e 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6668,9 +6668,11 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { } } -// Make sure if we received acks that are out of bounds, meaning past our +// Test that acking returns the expected reply message. +// Make sure that an error is returned when trying to ack the same message more than once. +// Make sure if we receive acks that are out of bounds, meaning past our // last sequence or before our first that they are ignored and errored if applicable. -func TestJetStreamClusterConsumerAckOutOfBounds(t *testing.T) { +func TestJetStreamConsumerClusterAckAck(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -6696,13 +6698,33 @@ func TestJetStreamClusterConsumerAckOutOfBounds(t *testing.T) { msgs, err := sub.Fetch(1) require_NoError(t, err) require_Equal(t, len(msgs), 1) - msgs[0].AckSync() + ackSubject := msgs[0].Reply + + var resp JSApiConsumerAckResponse + + // first ack + replyMsg, err := nc.Request(ackSubject, nil, 5000*time.Millisecond) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(replyMsg.Data, &resp)) + require_True(t, resp.Success) + + // check for error if trying to ack again + resp = JSApiConsumerAckResponse{} + replyMsg, err = nc.Request(ackSubject, nil, 250*time.Millisecond) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(replyMsg.Data, &resp)) + require_False(t, resp.Success) + require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr)) // Now ack way past the last sequence. - _, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond) - require_Error(t, err, nats.ErrTimeout) + resp = JSApiConsumerAckResponse{} + replyMsg, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(replyMsg.Data, &resp)) + require_False(t, resp.Success) + require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr)) - // Make sure that now changes happened to our state. + // Make sure that no changes happened to our state. ci, err := js.ConsumerInfo("TEST", "C") require_NoError(t, err) require_Equal(t, ci.Delivered.Consumer, 1) diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 146e969eb7c..6f148f60b7a 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -158,6 +158,9 @@ const ( // JSConsumerMetadataLengthErrF consumer metadata exceeds maximum size of {limit} JSConsumerMetadataLengthErrF ErrorIdentifier = 10135 + // JSConsumerMsgNotPendingAckErr This message is currently not pending an ack (already acked or not yet delivered) + JSConsumerMsgNotPendingAckErr ErrorIdentifier = 10165 + // JSConsumerMultipleFiltersNotAllowed consumer with multiple subject filters cannot use subject based API JSConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137 @@ -548,6 +551,7 @@ var ( JSConsumerMaxRequestExpiresToSmall: {Code: 400, ErrCode: 10115, Description: "consumer max request expires needs to be >= 1ms"}, JSConsumerMaxWaitingNegativeErr: {Code: 400, ErrCode: 10087, Description: "consumer max waiting needs to be positive"}, JSConsumerMetadataLengthErrF: {Code: 400, ErrCode: 10135, Description: "consumer metadata exceeds maximum size of {limit}"}, + JSConsumerMsgNotPendingAckErr: {Code: 400, ErrCode: 10165, Description: "This message is currently not pending an ack (already acked or not yet delivered)"}, JSConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10137, Description: "consumer with multiple subject filters cannot use subject based API"}, JSConsumerNameContainsPathSeparatorsErr: {Code: 400, ErrCode: 10127, Description: "Consumer name can not contain path separators"}, JSConsumerNameExistErr: {Code: 400, ErrCode: 10013, Description: "consumer name already in use"}, @@ -1249,6 +1253,16 @@ func NewJSConsumerMetadataLengthError(limit interface{}, opts ...ErrorOption) *A } } +// NewJSConsumerMsgNotPendingAckError creates a new JSConsumerMsgNotPendingAckErr error: "This message is currently not pending an ack (already acked or not yet delivered)" +func NewJSConsumerMsgNotPendingAckError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerMsgNotPendingAckErr] +} + // NewJSConsumerMultipleFiltersNotAllowedError creates a new JSConsumerMultipleFiltersNotAllowed error: "consumer with multiple subject filters cannot use subject based API" func NewJSConsumerMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 66b7566fb05..3a1649cdd39 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -2995,8 +2995,8 @@ func TestJetStreamConsumerAckAck(t *testing.T) { nc := clientConnectToServer(t, s) defer nc.Close() - // 4 for number of ack protocols to test them all. - for i := 0; i < 4; i++ { + // 5 for number of ack protocols plus already acked to test them all. + for i := 0; i < 5; i++ { sendStreamMsg(t, nc, mname, "Hello World!") } @@ -3015,6 +3015,34 @@ func TestJetStreamConsumerAckAck(t *testing.T) { testAck(AckNak) testAck(AckProgress) testAck(AckTerm) + + // checking replies and errors on two explicit acks for the same message or out of range ack + // first get the message + m, err := nc.Request(rqn, nil, 10*time.Millisecond) + require_NoError(t, err) + + var resp JSApiConsumerAckResponse + // Send a request for the first ack and make sure it worked. + ackReply1, err := nc.Request(m.Reply, AckAck, 10*time.Millisecond) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(ackReply1.Data, &resp)) + require_True(t, resp.Success) + + // Send a second ack and make sure it fails. + ackReply2, err := nc.Request(m.Reply, AckAck, 10*time.Millisecond) + require_NoError(t, err) + resp = JSApiConsumerAckResponse{} + require_NoError(t, json.Unmarshal(ackReply2.Data, &resp)) + require_False(t, resp.Success) + require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr)) + + // Error trying to ack an out of range sequence number + ackReply3, err := nc.Request("$JS.ACK.ACK-ACK.worker.1.6.0.0.0", AckAck, 10*time.Millisecond) + require_NoError(t, err) + resp = JSApiConsumerAckResponse{} + require_NoError(t, json.Unmarshal(ackReply3.Data, &resp)) + require_False(t, resp.Success) + require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr)) } func TestJetStreamAckNext(t *testing.T) {