Skip to content

Commit ab70643

Browse files
(2.12) Remove Read-after-Write for now (#7167)
Remove read-after-write/monotonic reads for 2.12. > Although the design (read-after-write/monotonic reads/linearizable) gives a lot of control and flexibility, we'll need to decide whether this adds a bunch of complexity which would be (too) hard to understand by an end-user, in which case we might need to revisit this. #7146 (comment) The current design is very flexible, but adds a ton of complexity that needs to be handled on a per-request basis. We've discussed whether this should instead be on a per-stream basis. Where on a replicated stream/KV/ObjectStore the reads are serializable by default, but one can opt-in to linearizable for that whole stream. That would mean client code doesn't need to change at all to get this guarantee, only the stream setting would need to be updated. We'll need to discuss this further, and likely introduce this early in the next 2.14 (skipping 2.13) release cycle. Relates to #6557 Relates to PRs: #6970, #7146 Signed-off-by: Maurice van Veen <[email protected]>
2 parents e24d783 + 131cbf1 commit ab70643

File tree

5 files changed

+32
-12
lines changed

5 files changed

+32
-12
lines changed

server/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ type ConsumerConfig struct {
116116
// Force memory storage.
117117
MemoryStorage bool `json:"mem_storage,omitempty"`
118118
// Force the consumer to only deliver messages if the stream has at minimum this specified last sequence.
119-
MinLastSeq uint64 `json:"min_last_seq,omitempty"`
119+
MinLastSeq uint64 `json:"-"`
120120

121121
// Don't add to general clients.
122122
Direct bool `json:"direct,omitempty"`

server/jetstream_api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ type JSApiMsgGetRequest struct {
680680
NextFor string `json:"next_by_subj,omitempty"`
681681

682682
// Force the server to only deliver messages if the stream has at minimum this specified last sequence.
683-
MinLastSeq uint64 `json:"min_last_seq,omitempty"`
683+
MinLastSeq uint64 `json:"-"`
684684

685685
// Batch support. Used to request more than one msg at a time.
686686
// Can be used with simple starting seq, but also NextFor with wildcards.

server/jetstream_cluster_1_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9256,6 +9256,9 @@ func TestJetStreamClusterAsyncFlushFileStoreFlushOnSnapshot(t *testing.T) {
92569256
}
92579257

92589258
func TestJetStreamClusterMsgGetReadAfterWrite(t *testing.T) {
9259+
// TODO(mvv): revisit for 2.14+
9260+
t.Skip()
9261+
92599262
c := createJetStreamClusterExplicit(t, "R3S", 3)
92609263
defer c.shutdown()
92619264

@@ -9296,6 +9299,9 @@ func TestJetStreamClusterMsgGetReadAfterWrite(t *testing.T) {
92969299
}
92979300

92989301
func TestJetStreamClusterMsgGetMonotonicRead(t *testing.T) {
9302+
// TODO(mvv): revisit for 2.14+
9303+
t.Skip()
9304+
92999305
c := createJetStreamClusterExplicit(t, "R3S", 3)
93009306
defer c.shutdown()
93019307

@@ -9332,6 +9338,9 @@ func TestJetStreamClusterMsgGetMonotonicRead(t *testing.T) {
93329338
}
93339339

93349340
func TestJetStreamClusterDirectGetReadAfterWrite(t *testing.T) {
9341+
// TODO(mvv): revisit for 2.14+
9342+
t.Skip()
9343+
93359344
c := createJetStreamClusterExplicit(t, "R3S", 3)
93369345
defer c.shutdown()
93379346

@@ -9393,6 +9402,9 @@ func TestJetStreamClusterDirectGetReadAfterWrite(t *testing.T) {
93939402
}
93949403

93959404
func TestJetStreamClusterMirrorDirectGetReadAfterWrite(t *testing.T) {
9405+
// TODO(mvv): revisit for 2.14+
9406+
t.Skip()
9407+
93969408
c := createJetStreamClusterExplicit(t, "R3S", 3)
93979409
defer c.shutdown()
93989410

@@ -9514,6 +9526,9 @@ func TestJetStreamClusterMirrorDirectGetReadAfterWrite(t *testing.T) {
95149526
}
95159527

95169528
func TestJetStreamClusterDirectGetReadAfterWriteOutdatedFollower(t *testing.T) {
9529+
// TODO(mvv): revisit for 2.14+
9530+
t.Skip()
9531+
95179532
test := func(t *testing.T, templ string, accName string, authenticated bool) {
95189533
c := createJetStreamClusterWithTemplate(t, templ, "R3S", 3)
95199534
defer c.shutdown()
@@ -9617,6 +9632,9 @@ func TestJetStreamClusterDirectGetReadAfterWriteOutdatedFollower(t *testing.T) {
96179632
}
96189633

96199634
func TestJetStreamClusterDirectGetMonotonicRead(t *testing.T) {
9635+
// TODO(mvv): revisit for 2.14+
9636+
t.Skip()
9637+
96209638
c := createJetStreamClusterExplicit(t, "R3S", 3)
96219639
defer c.shutdown()
96229640

@@ -9670,6 +9688,9 @@ func TestJetStreamClusterDirectGetMonotonicRead(t *testing.T) {
96709688
}
96719689

96729690
func TestJetStreamClusterDirectGetLastBySubjectReadAfterWrite(t *testing.T) {
9691+
// TODO(mvv): revisit for 2.14+
9692+
t.Skip()
9693+
96739694
c := createJetStreamClusterExplicit(t, "R3S", 3)
96749695
defer c.shutdown()
96759696

@@ -9731,6 +9752,9 @@ func TestJetStreamClusterDirectGetLastBySubjectReadAfterWrite(t *testing.T) {
97319752
}
97329753

97339754
func TestJetStreamClusterDirectGetLastBySubjectReadAfterWriteOutdatedFollower(t *testing.T) {
9755+
// TODO(mvv): revisit for 2.14+
9756+
t.Skip()
9757+
97349758
c := createJetStreamClusterExplicit(t, "R3S", 3)
97359759
defer c.shutdown()
97369760

@@ -9784,6 +9808,9 @@ func TestJetStreamClusterDirectGetLastBySubjectReadAfterWriteOutdatedFollower(t
97849808
}
97859809

97869810
func TestJetStreamClusterDirectGetLastBySubjectMonotonicRead(t *testing.T) {
9811+
// TODO(mvv): revisit for 2.14+
9812+
t.Skip()
9813+
97879814
c := createJetStreamClusterExplicit(t, "R3S", 3)
97889815
defer c.shutdown()
97899816

@@ -9834,6 +9861,9 @@ func TestJetStreamClusterDirectGetLastBySubjectMonotonicRead(t *testing.T) {
98349861
}
98359862

98369863
func TestJetStreamClusterConsumerReadAfterWrite(t *testing.T) {
9864+
// TODO(mvv): revisit for 2.14+
9865+
t.Skip()
9866+
98379867
c := createJetStreamClusterExplicit(t, "R3S", 3)
98389868
defer c.shutdown()
98399869

server/jetstream_versioning.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,6 @@ func setStaticConsumerMetadata(cfg *ConsumerConfig) {
126126
requires(1)
127127
}
128128

129-
// Added in 2.12, absent | zero is the feature is not used.
130-
if cfg.MinLastSeq > 0 {
131-
requires(2)
132-
}
133-
134129
cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
135130
}
136131

server/jetstream_versioning_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,6 @@ func TestJetStreamSetStaticConsumerMetadata(t *testing.T) {
224224
cfg: &ConsumerConfig{PriorityPolicy: PriorityPinnedClient, PriorityGroups: []string{"a"}},
225225
expectedMetadata: metadataAtLevel("1"),
226226
},
227-
{
228-
desc: "MinLastSeq",
229-
cfg: &ConsumerConfig{MinLastSeq: 1},
230-
expectedMetadata: metadataAtLevel("2"),
231-
},
232227
} {
233228
t.Run(test.desc, func(t *testing.T) {
234229
setStaticConsumerMetadata(test.cfg)

0 commit comments

Comments
 (0)