|
4 | 4 | |----------|--------------------------------------------------------------|
|
5 | 5 | | Date | 2025-07-11 |
|
6 | 6 | | Author | @MauriceVanVeen |
|
7 |
| -| Status | Proposed | |
| 7 | +| Status | Implemented | |
8 | 8 | | Tags | jetstream, kv, objectstore, server, client, refinement, 2.12 |
|
9 | 9 | | Updates | ADR-8, ADR-17, ADR-20, ADR-31, ADR-37 |
|
10 | 10 |
|
11 |
| -| Revision | Date | Author | Info | |
12 |
| -|----------|------------|-----------------|----------------| |
13 |
| -| 1 | 2025-07-11 | @MauriceVanVeen | Initial design | |
| 11 | +| Revision | Date | Author | Info | |
| 12 | +|----------|------------|-----------------|-------------------------------------| |
| 13 | +| 1 | 2025-07-11 | @MauriceVanVeen | Initial design | |
| 14 | +| 2 | 2025-07-31 | @MauriceVanVeen | Added Client Implementation section | |
14 | 15 |
|
15 | 16 | ## Problem Statement
|
16 | 17 |
|
@@ -149,3 +150,130 @@ purge markers. Therefore, the KV abstraction still has these guarantees since it
|
149 | 150 | Since this is an opt-in on a read request or consumer create basis, this is not a breaking change. Depending on client
|
150 | 151 | implementation, this could be harder to implement. But given it's just another field in the `JSApiMsgGetRequest` and
|
151 | 152 | `ConsumerConfig`, each client should have no trouble supporting it.
|
| 153 | + |
| 154 | +## Client implementation |
| 155 | + |
| 156 | +The below sections outline what additions the clients should support for message read requests and consumers, as used in |
| 157 | +JetStream streams, KV and Object Store. |
| 158 | + |
| 159 | +Generally, clients should expect error codes such as `NATS/1.0 412 Min Last Sequence` for Direct Get requests. Message |
| 160 | +Get requests will return the following error code: |
| 161 | + |
| 162 | +```go |
| 163 | +JSStreamMinLastSeqErr: {Code: 412, ErrCode: 10180, Description: "min last sequence"}, |
| 164 | +``` |
| 165 | + |
| 166 | +A consumer created with a `min_last_seq` does not return errors. However, the consumer will wait with delivering |
| 167 | +messages until the minimum last sequence is reached for the underlying stream store. |
| 168 | + |
| 169 | +### Note about testing |
| 170 | + |
| 171 | +A replicated stream can have followers that are slightly lagging behind in their applies, allowing for a stale read to |
| 172 | +be served after the client has just written a new value. This is inherently a race condition and can't be controlled by |
| 173 | +a client test, unless it meticulously controls the state of the server (for example through embedding the server). |
| 174 | + |
| 175 | +The recommended way for writing tests would be: |
| 176 | + |
| 177 | +- Test Message Get/Direct Get requests with a too high sequence that doesn't exist (yet) in the stream. It should return |
| 178 | + the `412 Min Last Sequence` error. Then publish a new message to the stream, get the publish acknowledgement, and |
| 179 | + confirm that a retry of the previous read succeeds. |
| 180 | +- Test Consumers by using a too high sequence that doesn't exist (yet) in the stream. The consumer should not deliver |
| 181 | + messages. Then publish a new message to the stream, reaching the min last sequence threshold, the consumer should now |
| 182 | + start delivering messages. |
| 183 | + |
| 184 | +### Message read requests |
| 185 | + |
| 186 | +- Message read requests (Message Get & Direct Get), such as `stream.GetMsg` and `stream.GetLastMsgForSubject`, should |
| 187 | + support an option to include `min_last_seq` in the body of `JSApiMsgGetRequest`. |
| 188 | + |
| 189 | +**Example:** |
| 190 | + |
| 191 | +```go |
| 192 | +// Write |
| 193 | +ack, err := js.Publish("foo", nil) |
| 194 | + |
| 195 | +// Reads |
| 196 | +msg, err := stream.GetMsg(ctx, ack.Sequence, jetstream.MinLastSequence(ack.Sequence)) |
| 197 | +// -> $JS.API.DIRECT.GET.STREAM {"seq":1,"min_last_seq":1} |
| 198 | +msg, err := stream.GetLastMsgForSubject(ctx, "foo", jetstream.MinLastSequence(ack.Sequence)) |
| 199 | +// -> $JS.API.DIRECT.GET.STREAM.foo {"min_last_seq":1} |
| 200 | +``` |
| 201 | + |
| 202 | +- Similar to the above additions, KV should also support passing a minimum last revision. |
| 203 | + |
| 204 | +**Example:** |
| 205 | + |
| 206 | +```go |
| 207 | +kve, err := kv.Get(ctx, "key", jetstream.MinLastSequence(ack.Sequence)) |
| 208 | +kve, err := kv.GetRevision(ctx, "foo", 1, jetstream.MinLastSequence(ack.Sequence)) |
| 209 | +``` |
| 210 | + |
| 211 | +### Consumers |
| 212 | + |
| 213 | +- Similar to passing a `min_last_seq` in read requests, this should also be optionally passed in the `ConsumerConfig` |
| 214 | + when creating a consumer. This is not strictly required when the consumer is used for endless consumption, but should |
| 215 | + be supported when an "ordered consumer" is used since it's often used for "limited consumption" for example with |
| 216 | + `kv.ListKeys()`. |
| 217 | + |
| 218 | +**Example:** |
| 219 | + |
| 220 | +```go |
| 221 | +// Start consuming, ensuring the newly written message is included (in NumPending counts, etc.) |
| 222 | +ack, err := js.Publish("foo", nil) |
| 223 | +c, err := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{MinLastSeq: ack.Sequence}) |
| 224 | + |
| 225 | +// List all keys, including a newly written key. |
| 226 | +r, err := kv.Put(ctx, "key", []byte("value")) |
| 227 | +keys, err := kv.ListKeys(ctx, jetstream.MinLastRevision(r)) |
| 228 | +``` |
| 229 | + |
| 230 | +### KV Store |
| 231 | + |
| 232 | +The `kv.Create` method ensures a key only gets created if it doesn't already exist. If the key was previously deleted or |
| 233 | +purged, the client can also handle these conditions. However, because the `kv.Create` is responded to by the stream |
| 234 | +leader and the `kv.Get` it does internally could be answered by an outdated follower, the subsequent internal |
| 235 | +`kv.Update` call could then fail. |
| 236 | + |
| 237 | +When the client receives the following error: `wrong last sequence: 5`, it should recognize this and extract the |
| 238 | +sequence from the error message. The error format is `wrong last sequence: {seq}`, and the sequence is that of the |
| 239 | +revision it needs to pass in the `kv.Update` call. |
| 240 | + |
| 241 | +This removes the need for the intermediate `kv.Get` call that could return stale reads, and ensures the `kv.Update` has |
| 242 | +the required "monotonic read" property. |
| 243 | + |
| 244 | +### Object Store |
| 245 | + |
| 246 | +Object Store uses a combination of message read requests and consumers, to both get single-message object info as well |
| 247 | +as reading the object itself. |
| 248 | + |
| 249 | +- Write requests, such as `obs.Put`, should return the highest sequence of that object as `ObjectInfo.Sequence`. This |
| 250 | + highest sequence is the sequence of the "meta message" which is sent last after the object chunks. |
| 251 | +- All single-message read requests should support, similar to KV, passing the `min_last_seq` in the message/direct get |
| 252 | + request. |
| 253 | +- All consumers used to gather the object data should support passing the `min_last_seq` in the `ConsumerConfig`. |
| 254 | + |
| 255 | +**Example:** |
| 256 | + |
| 257 | +```go |
| 258 | +// Write object. |
| 259 | +info, err := obs.PutString(ctx, "file", "data") |
| 260 | + |
| 261 | +// Listing objects should include written file. |
| 262 | +lch, err := obs.List(ctx, jetstream.MinLastSequence(info.Sequence)) |
| 263 | + |
| 264 | +// Watch itself doesn't strictly require MinLastSequence support, |
| 265 | +// since it's used for endless consumption. |
| 266 | +watcher, err := obs.Watch(ctx) |
| 267 | +for { |
| 268 | + select { |
| 269 | + case info := <-watcher.Updates(): |
| 270 | + if info == nil { |
| 271 | + return |
| 272 | + } |
| 273 | + // Object read should support passing MinLastSequence to ensure the consumed metadata |
| 274 | + // can be retrieved. The watcher could live on the stream leader's server, but the |
| 275 | + // consumer to retrieve the chunks could be created on a temporarily outdated follower. |
| 276 | + value, err := obs.GetString(ctx, info.Name, jetstream.MinLastSequence(info.Sequence)) |
| 277 | + } |
| 278 | +} |
| 279 | +``` |
0 commit comments