Skip to content

Commit b41a81b

Browse files
committed
PR feedback
Signed-off-by: Jean-Noël Moyne <[email protected]>
1 parent 36d5823 commit b41a81b

File tree

4 files changed

+15
-14
lines changed

4 files changed

+15
-14
lines changed

server/consumer.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2274,14 +2274,14 @@ func configsEqualSansDelivery(a, b ConsumerConfig) bool {
22742274
func (o *consumer) sendAckReply(subj string, err *ApiError) {
22752275
o.mu.RLock()
22762276
defer o.mu.RUnlock()
2277-
o.sendAckReplyLockHeld(subj, err)
2277+
o.sendAckReplyLocked(subj, err)
22782278
}
22792279

22802280
// Helper to send a reply to an ack.
22812281
// Lock must be held
2282-
func (o *consumer) sendAckReplyLockHeld(subj string, err *ApiError) {
2282+
func (o *consumer) sendAckReplyLocked(subj string, err *ApiError) {
22832283

2284-
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerAckResponseType, Error: err}}
2284+
var resp = JSApiConsumerAckResponse{ApiResponse: ApiResponse{Type: JSApiConsumerAckResponseType, Error: err}}
22852285

22862286
if err == nil {
22872287
resp.Success = true
@@ -2350,11 +2350,9 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
23502350

23512351
switch {
23522352
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
2353-
if b, e := o.processAckMsg(sseq, dseq, dc, reply, true); e == nil {
2354-
if !b {
2355-
// We handle replies for acks in updateAcks
2356-
skipAckReply = true
2357-
}
2353+
if b, e := o.processAckMsg(sseq, dseq, dc, reply, true); e == nil && !b {
2354+
// We handle replies for acks in updateAcks
2355+
skipAckReply = true
23582356
} else {
23592357
err = e
23602358
}
@@ -2531,7 +2529,7 @@ func (o *consumer) updateAcks(dseq, sseq uint64, reply string, err *ApiError) {
25312529
o.store.UpdateAcks(dseq, sseq)
25322530
if reply != _EMPTY_ {
25332531
// Already locked so send direct.
2534-
o.sendAckReplyLockHeld(reply, err)
2532+
o.sendAckReplyLocked(reply, err)
25352533
}
25362534
}
25372535
// Update activity.

server/jetstream_cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5275,7 +5275,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {
52755275

52765276
// Check if we have a reply that was requested.
52775277
if reply := o.replies[sseq]; reply != _EMPTY_ {
5278-
o.sendAckReplyLockHeld(reply, nil)
5278+
o.sendAckReplyLocked(reply, nil)
52795279
delete(o.replies, sseq)
52805280
}
52815281

server/jetstream_cluster_1_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6714,14 +6714,15 @@ func TestJetStreamConsumerClusterAckAck(t *testing.T) {
67146714
require_NoError(t, err)
67156715
require_NoError(t, json.Unmarshal(replyMsg.Data, &resp))
67166716
require_False(t, resp.Success)
6717-
require_True(t, resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr))
6717+
require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr))
67186718

67196719
// Now ack way past the last sequence.
67206720
resp = JSApiConsumerAckResponse{}
67216721
replyMsg, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond)
67226722
require_NoError(t, err)
67236723
require_NoError(t, json.Unmarshal(replyMsg.Data, &resp))
6724-
require_True(t, resp.Error != nil && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr))
6724+
require_False(t, resp.Success)
6725+
require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr))
67256726

67266727
// Make sure that no changes happened to our state.
67276728
ci, err := js.ConsumerInfo("TEST", "C")

server/jetstream_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3033,14 +3033,16 @@ func TestJetStreamConsumerAckAck(t *testing.T) {
30333033
require_NoError(t, err)
30343034
resp = JSApiConsumerAckResponse{}
30353035
require_NoError(t, json.Unmarshal(ackReply2.Data, &resp))
3036-
require_True(t, !resp.Success && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr))
3036+
require_False(t, resp.Success)
3037+
require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr))
30373038

30383039
// Error trying to ack an out of range sequence number
30393040
ackReply3, err := nc.Request("$JS.ACK.ACK-ACK.worker.1.6.0.0.0", AckAck, 10*time.Millisecond)
30403041
require_NoError(t, err)
30413042
resp = JSApiConsumerAckResponse{}
30423043
require_NoError(t, json.Unmarshal(ackReply3.Data, &resp))
3043-
require_True(t, !resp.Success && resp.Error.ErrCode == uint16(JSConsumerMsgNotPendingAckErr))
3044+
require_False(t, resp.Success)
3045+
require_Equal(t, resp.Error.ErrCode, uint16(JSConsumerMsgNotPendingAckErr))
30443046
}
30453047

30463048
func TestJetStreamAckNext(t *testing.T) {

0 commit comments

Comments
 (0)