From 36d5823457c3e09af81f80436ff235c64849ee37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Wed, 27 Nov 2024 14:59:45 -0800 Subject: [PATCH 1/3] Adds replies for AckAck requests Now returns a JS API payload rather than an empty message in response to AckAck requests. New error: when trying to ack a sequence number higher than the stream's last sequence and when trying to ack a sequence number more than once. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jean-Noël Moyne --- server/consumer.go | 67 +++++++++++++++++++--------- server/errors.json | 10 +++++ server/jetstream_api.go | 7 +++ server/jetstream_cluster.go | 2 +- server/jetstream_cluster_1_test.go | 33 +++++++++++--- server/jetstream_errors_generated.go | 14 ++++++ server/jetstream_test.go | 30 ++++++++++++- 7 files changed, 132 insertions(+), 31 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index e4118c72cf5..2191e197b90 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2271,10 +2271,27 @@ func configsEqualSansDelivery(a, b ConsumerConfig) bool { } // Helper to send a reply to an ack. -func (o *consumer) sendAckReply(subj string) { +func (o *consumer) sendAckReply(subj string, err *ApiError) { o.mu.RLock() defer o.mu.RUnlock() - o.outq.sendMsg(subj, nil) + o.sendAckReplyLockHeld(subj, err) +} + +// Helper to send a reply to an ack. +// Lock must be held +func (o *consumer) sendAckReplyLockHeld(subj string, err *ApiError) { + + var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerAckResponseType, Error: err}} + + if err == nil { + resp.Success = true + } + + j, e := json.Marshal(resp) + if e != nil { + return + } + o.outq.sendMsg(subj, j) } type jsAckMsg struct { @@ -2327,17 +2344,22 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { } sseq, dseq, dc := ackReplyInfo(subject) + var err *ApiError skipAckReply := sseq == 0 switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): - if !o.processAckMsg(sseq, dseq, dc, reply, true) { - // We handle replies for acks in updateAcks - skipAckReply = true + if b, e := o.processAckMsg(sseq, dseq, dc, reply, true); e == nil { + if !b { + // We handle replies for acks in updateAcks + skipAckReply = true + } + } else { + err = e } case bytes.HasPrefix(msg, AckNext): - o.processAckMsg(sseq, dseq, dc, _EMPTY_, true) + _, _ = o.processAckMsg(sseq, dseq, dc, _EMPTY_, true) o.processNextMsgRequest(reply, msg[len(AckNext):]) skipAckReply = true case bytes.HasPrefix(msg, AckNak): @@ -2357,7 +2379,7 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { // Ack the ack if requested. if len(reply) > 0 && !skipAckReply { - o.sendAckReply(reply) + o.sendAckReply(reply, err) } } @@ -2493,7 +2515,7 @@ func (o *consumer) addAckReply(sseq uint64, reply string) { } // Lock should be held. -func (o *consumer) updateAcks(dseq, sseq uint64, reply string) { +func (o *consumer) updateAcks(dseq, sseq uint64, reply string, err *ApiError) { if o.node != nil { // Inline for now, use variable compression. var b [2*binary.MaxVarintLen64 + 1]byte @@ -2509,7 +2531,7 @@ func (o *consumer) updateAcks(dseq, sseq uint64, reply string) { o.store.UpdateAcks(dseq, sseq) if reply != _EMPTY_ { // Already locked so send direct. - o.outq.sendMsg(reply, nil) + o.sendAckReplyLockHeld(reply, err) } } // Update activity. @@ -2705,7 +2727,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { // case the reply will be sent later). func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool { // Treat like an ack to suppress redelivery. - ackedInPlace := o.processAckMsg(sseq, dseq, dc, reply, false) + ackedInPlace, _ := o.processAckMsg(sseq, dseq, dc, reply, false) o.mu.Lock() defer o.mu.Unlock() @@ -3061,17 +3083,17 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { // Returns `true` if the ack was processed in place and the sender can now respond // to the client, or `false` if there was an error or the ack is replicated (in which // case the reply will be sent later). -func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) bool { +func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) (bool, *ApiError) { o.mu.Lock() if o.closed { o.mu.Unlock() - return false + return false, nil } mset := o.mset if mset == nil || mset.closed.Load() { o.mu.Unlock() - return false + return false, nil } // Check if this ack is above the current pointer to our next to deliver. @@ -3083,11 +3105,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b var ss StreamState mset.store.FastState(&ss) if sseq > ss.LastSeq { - o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d", - o.acc.Name, o.stream, o.name, sseq, ss.LastSeq) - // FIXME(dlc) - For 2.11 onwards should we return an error here to the caller? o.mu.Unlock() - return false + return false, NewJSConsumerMsgNotPendingAckError() } o.sseq = sseq + 1 } @@ -3099,6 +3118,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b var sgap, floor uint64 var needSignal bool + var err *ApiError switch o.cfg.AckPolicy { case AckExplicit: @@ -3128,14 +3148,17 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b } } } + } else { + err = NewJSConsumerMsgNotPendingAckError() } + delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) case AckAll: // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() - return ackInPlace + return ackInPlace, nil } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true @@ -3167,7 +3190,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b case AckNone: // FIXME(dlc) - This is error but do we care? o.mu.Unlock() - return ackInPlace + return ackInPlace, nil } // No ack replication, so we set reply to "" so that updateAcks does not @@ -3176,7 +3199,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b reply = _EMPTY_ } // Update underlying store. - o.updateAcks(dseq, sseq, reply) + o.updateAcks(dseq, sseq, reply, err) o.mu.Unlock() if ackInPlace { @@ -3194,7 +3217,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b if needSignal { o.signalNewMessages() } - return ackInPlace + return ackInPlace, err } // Determine if this is a truly filtered consumer. Modern clients will place filtered subjects @@ -4834,7 +4857,7 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, if mset != nil && mset.ackq != nil && (o.node == nil || o.cfg.Direct) { mset.ackq.push(seq) } else { - o.updateAcks(dseq, seq, _EMPTY_) + o.updateAcks(dseq, seq, _EMPTY_, nil) } } } diff --git a/server/errors.json b/server/errors.json index 9bb8cb18c66..78ccacb57dd 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1628,5 +1628,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerMsgNotPendingAckErr", + "code": 400, + "error_code": 10163, + "description": "This message is currently not pending an ack (already acked or not yet delivered)", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 45a6ddc01df..c3011ab7037 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -718,6 +718,13 @@ type JSApiConsumerPauseRequest struct { PauseUntil time.Time `json:"pause_until,omitempty"` } +const JSApiConsumerAckResponseType = "io.nats.jetstream.api.v1.consumer_ack_response" + +type JSApiConsumerAckResponse struct { + ApiResponse + Success bool `json:"success,omitempty"` +} + const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response" type JSApiConsumerPauseResponse struct { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 93f681c323d..3fefd7a815d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5275,7 +5275,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error { // Check if we have a reply that was requested. if reply := o.replies[sseq]; reply != _EMPTY_ { - o.outq.sendMsg(reply, nil) + o.sendAckReplyLockHeld(reply, nil) delete(o.replies, sseq) } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 06adf875c5d..b37c6803646 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6668,9 +6668,11 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { } } -// Make sure if we received acks that are out of bounds, meaning past our +// Test that acking returns the expected reply message. +// Make sure that an error is returned when trying to ack the same message more than once. +// Make sure if we receive acks that are out of bounds, meaning past our // last sequence or before our first that they are ignored and errored if applicable. -func TestJetStreamClusterConsumerAckOutOfBounds(t *testing.T) { +func TestJetStreamConsumerClusterAckAck(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -6696,13 +6698,32 @@ func TestJetStreamClusterConsumerAckOutOfBounds(t *testing.T) { msgs, err := sub.Fetch(1) require_NoError(t, err) require_Equal(t, len(msgs), 1) - msgs[0].AckSync() + ackSubject := msgs[0].Reply + + var resp JSApiConsumerAckResponse + + // first ack + replyMsg, err := nc.Request(ackSubject, nil, 5000*time.Millisecond) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(replyMsg.Data, &resp)) + require_True(t, resp.Success) + + // check for error if trying to ack again + resp = JSApiConsumerAckResponse{} + replyMsg, err = nc.Request(ackSubject, nil, 250*time.Millisecond) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(replyMsg.Data, &resp)) + require_False(t, resp.Success) + require_True(t, resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr)) // Now ack way past the last sequence. - _, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond) - require_Error(t, err, nats.ErrTimeout) + resp = JSApiConsumerAckResponse{} + replyMsg, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(replyMsg.Data, &resp)) + require_True(t, resp.Error != nil && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr)) - // Make sure that now changes happened to our state. + // Make sure that no changes happened to our state. ci, err := js.ConsumerInfo("TEST", "C") require_NoError(t, err) require_Equal(t, ci.Delivered.Consumer, 1) diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 146e969eb7c..7ce79a2b4b1 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -158,6 +158,9 @@ const ( // JSConsumerMetadataLengthErrF consumer metadata exceeds maximum size of {limit} JSConsumerMetadataLengthErrF ErrorIdentifier = 10135 + // JSConsumerMsgNotPendingAckErr This message is currently not pending an ack (already acked or not yet delivered) + JSConsumerMsgNotPendingAckErr ErrorIdentifier = 10163 + // JSConsumerMultipleFiltersNotAllowed consumer with multiple subject filters cannot use subject based API JSConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137 @@ -548,6 +551,7 @@ var ( JSConsumerMaxRequestExpiresToSmall: {Code: 400, ErrCode: 10115, Description: "consumer max request expires needs to be >= 1ms"}, JSConsumerMaxWaitingNegativeErr: {Code: 400, ErrCode: 10087, Description: "consumer max waiting needs to be positive"}, JSConsumerMetadataLengthErrF: {Code: 400, ErrCode: 10135, Description: "consumer metadata exceeds maximum size of {limit}"}, + JSConsumerMsgNotPendingAckErr: {Code: 400, ErrCode: 10163, Description: "This message is currently not pending an ack (already acked or not yet delivered)"}, JSConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10137, Description: "consumer with multiple subject filters cannot use subject based API"}, JSConsumerNameContainsPathSeparatorsErr: {Code: 400, ErrCode: 10127, Description: "Consumer name can not contain path separators"}, JSConsumerNameExistErr: {Code: 400, ErrCode: 10013, Description: "consumer name already in use"}, @@ -1249,6 +1253,16 @@ func NewJSConsumerMetadataLengthError(limit interface{}, opts ...ErrorOption) *A } } +// NewJSConsumerMsgNotPendingAckError creates a new JSConsumerMsgNotPendingAckErr error: "This message is currently not pending an ack (already acked or not yet delivered)" +func NewJSConsumerMsgNotPendingAckError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerMsgNotPendingAckErr] +} + // NewJSConsumerMultipleFiltersNotAllowedError creates a new JSConsumerMultipleFiltersNotAllowed error: "consumer with multiple subject filters cannot use subject based API" func NewJSConsumerMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 66b7566fb05..4227b0f3289 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -2995,8 +2995,8 @@ func TestJetStreamConsumerAckAck(t *testing.T) { nc := clientConnectToServer(t, s) defer nc.Close() - // 4 for number of ack protocols to test them all. - for i := 0; i < 4; i++ { + // 5 for number of ack protocols plus already acked to test them all. + for i := 0; i < 5; i++ { sendStreamMsg(t, nc, mname, "Hello World!") } @@ -3015,6 +3015,32 @@ func TestJetStreamConsumerAckAck(t *testing.T) { testAck(AckNak) testAck(AckProgress) testAck(AckTerm) + + // checking replies and errors on two explicit acks for the same message or out of range ack + // first get the message + m, err := nc.Request(rqn, nil, 10*time.Millisecond) + require_NoError(t, err) + + var resp JSApiConsumerAckResponse + // Send a request for the first ack and make sure it worked. + ackReply1, err := nc.Request(m.Reply, AckAck, 10*time.Millisecond) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(ackReply1.Data, &resp)) + require_True(t, resp.Success) + + // Send a second ack and make sure it fails. + ackReply2, err := nc.Request(m.Reply, AckAck, 10*time.Millisecond) + require_NoError(t, err) + resp = JSApiConsumerAckResponse{} + require_NoError(t, json.Unmarshal(ackReply2.Data, &resp)) + require_True(t, !resp.Success && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr)) + + // Error trying to ack an out of range sequence number + ackReply3, err := nc.Request("$JS.ACK.ACK-ACK.worker.1.6.0.0.0", AckAck, 10*time.Millisecond) + require_NoError(t, err) + resp = JSApiConsumerAckResponse{} + require_NoError(t, json.Unmarshal(ackReply3.Data, &resp)) + require_True(t, !resp.Success && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr)) } func TestJetStreamAckNext(t *testing.T) { From b41a81b4a7d8d89742d7df4a5712b5b8131106e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Tue, 3 Dec 2024 11:12:42 -0800 Subject: [PATCH 2/3] PR feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jean-Noël Moyne --- server/consumer.go | 16 +++++++--------- server/jetstream_cluster.go | 2 +- server/jetstream_cluster_1_test.go | 5 +++-- server/jetstream_test.go | 6 ++++-- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 2191e197b90..bb248f7c430 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2274,14 +2274,14 @@ func configsEqualSansDelivery(a, b ConsumerConfig) bool { func (o *consumer) sendAckReply(subj string, err *ApiError) { o.mu.RLock() defer o.mu.RUnlock() - o.sendAckReplyLockHeld(subj, err) + o.sendAckReplyLocked(subj, err) } // Helper to send a reply to an ack. // Lock must be held -func (o *consumer) sendAckReplyLockHeld(subj string, err *ApiError) { +func (o *consumer) sendAckReplyLocked(subj string, err *ApiError) { - var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerAckResponseType, Error: err}} + var resp = JSApiConsumerAckResponse{ApiResponse: ApiResponse{Type: JSApiConsumerAckResponseType, Error: err}} if err == nil { resp.Success = true @@ -2350,11 +2350,9 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): - if b, e := o.processAckMsg(sseq, dseq, dc, reply, true); e == nil { - if !b { - // We handle replies for acks in updateAcks - skipAckReply = true - } + if b, e := o.processAckMsg(sseq, dseq, dc, reply, true); e == nil && !b { + // We handle replies for acks in updateAcks + skipAckReply = true } else { err = e } @@ -2531,7 +2529,7 @@ func (o *consumer) updateAcks(dseq, sseq uint64, reply string, err *ApiError) { o.store.UpdateAcks(dseq, sseq) if reply != _EMPTY_ { // Already locked so send direct. - o.sendAckReplyLockHeld(reply, err) + o.sendAckReplyLocked(reply, err) } } // Update activity. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3fefd7a815d..e57d2244ac6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5275,7 +5275,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error { // Check if we have a reply that was requested. if reply := o.replies[sseq]; reply != _EMPTY_ { - o.sendAckReplyLockHeld(reply, nil) + o.sendAckReplyLocked(reply, nil) delete(o.replies, sseq) } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index b37c6803646..30c4fabc00e 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6714,14 +6714,15 @@ func TestJetStreamConsumerClusterAckAck(t *testing.T) { require_NoError(t, err) require_NoError(t, json.Unmarshal(replyMsg.Data, &resp)) require_False(t, resp.Success) - require_True(t, resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr)) + require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr)) // Now ack way past the last sequence. resp = JSApiConsumerAckResponse{} replyMsg, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond) require_NoError(t, err) require_NoError(t, json.Unmarshal(replyMsg.Data, &resp)) - require_True(t, resp.Error != nil && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr)) + require_False(t, resp.Success) + require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr)) // Make sure that no changes happened to our state. ci, err := js.ConsumerInfo("TEST", "C") diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 4227b0f3289..3a1649cdd39 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -3033,14 +3033,16 @@ func TestJetStreamConsumerAckAck(t *testing.T) { require_NoError(t, err) resp = JSApiConsumerAckResponse{} require_NoError(t, json.Unmarshal(ackReply2.Data, &resp)) - require_True(t, !resp.Success && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr)) + require_False(t, resp.Success) + require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr)) // Error trying to ack an out of range sequence number ackReply3, err := nc.Request("$JS.ACK.ACK-ACK.worker.1.6.0.0.0", AckAck, 10*time.Millisecond) require_NoError(t, err) resp = JSApiConsumerAckResponse{} require_NoError(t, json.Unmarshal(ackReply3.Data, &resp)) - require_True(t, !resp.Success && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr)) + require_False(t, resp.Success) + require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr)) } func TestJetStreamAckNext(t *testing.T) { From deae0973d806754c89b4aa2d9208bf5eae805025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Fri, 6 Dec 2024 12:32:41 -0800 Subject: [PATCH 3/3] Update error identifier after rebase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jean-Noël Moyne --- server/errors.json | 2 +- server/jetstream_errors_generated.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/errors.json b/server/errors.json index 78ccacb57dd..92ab970d4e2 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1632,7 +1632,7 @@ { "constant": "JSConsumerMsgNotPendingAckErr", "code": 400, - "error_code": 10163, + "error_code": 10165, "description": "This message is currently not pending an ack (already acked or not yet delivered)", "comment": "", "help": "", diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 7ce79a2b4b1..6f148f60b7a 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -159,7 +159,7 @@ const ( JSConsumerMetadataLengthErrF ErrorIdentifier = 10135 // JSConsumerMsgNotPendingAckErr This message is currently not pending an ack (already acked or not yet delivered) - JSConsumerMsgNotPendingAckErr ErrorIdentifier = 10163 + JSConsumerMsgNotPendingAckErr ErrorIdentifier = 10165 // JSConsumerMultipleFiltersNotAllowed consumer with multiple subject filters cannot use subject based API JSConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137 @@ -551,7 +551,7 @@ var ( JSConsumerMaxRequestExpiresToSmall: {Code: 400, ErrCode: 10115, Description: "consumer max request expires needs to be >= 1ms"}, JSConsumerMaxWaitingNegativeErr: {Code: 400, ErrCode: 10087, Description: "consumer max waiting needs to be positive"}, JSConsumerMetadataLengthErrF: {Code: 400, ErrCode: 10135, Description: "consumer metadata exceeds maximum size of {limit}"}, - JSConsumerMsgNotPendingAckErr: {Code: 400, ErrCode: 10163, Description: "This message is currently not pending an ack (already acked or not yet delivered)"}, + JSConsumerMsgNotPendingAckErr: {Code: 400, ErrCode: 10165, Description: "This message is currently not pending an ack (already acked or not yet delivered)"}, JSConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10137, Description: "consumer with multiple subject filters cannot use subject based API"}, JSConsumerNameContainsPathSeparatorsErr: {Code: 400, ErrCode: 10127, Description: "Consumer name can not contain path separators"}, JSConsumerNameExistErr: {Code: 400, ErrCode: 10013, Description: "consumer name already in use"},