Skip to content

Commit 7d1bb3e

Browse files
GsantomaggioFalk Jonas - HK
andauthored
Expose Store offset to stream system (#408)
* Add the ability to store offsets using the streamsystem, which can be used when you're not in the context of a messagehandler * Added reconnection and a test --------- Co-authored-by: Falk Jonas - HK <[email protected]>
1 parent 660738b commit 7d1bb3e

File tree

3 files changed

+20
-0
lines changed

3 files changed

+20
-0
lines changed

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream
338338
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStream(RabbitMQ.Stream.Client.SuperStreamSpec spec) -> System.Threading.Tasks.Task
339339
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamConsumer>
340340
RabbitMQ.Stream.Client.StreamSystem.DeleteSuperStream(string superStream) -> System.Threading.Tasks.Task
341+
RabbitMQ.Stream.Client.StreamSystem.StoreOffset(string reference, string stream, ulong offsetValue) -> System.Threading.Tasks.Task
341342
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
342343
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
343344
RabbitMQ.Stream.Client.StreamSystem.SuperStreamExists(string superStream) -> System.Threading.Tasks.Task<bool>

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,12 @@ private async Task MayBeReconnectLocator()
184184
}
185185
}
186186

187+
public async Task StoreOffset(string reference, string stream, ulong offsetValue)
188+
{
189+
await MayBeReconnectLocator().ConfigureAwait(false);
190+
await _client.StoreOffset(reference, stream, offsetValue).ConfigureAwait(false);
191+
}
192+
187193
public async Task UpdateSecret(string newSecret)
188194
{
189195
// store the old password just in case it will fail to update the secret

Tests/SystemTests.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,5 +401,18 @@ await Assert.ThrowsAsync<DeleteStreamException>(
401401
);
402402
await system.Close();
403403
}
404+
405+
[Fact]
406+
public async Task ClientShouldStoreOffset()
407+
{
408+
var stream = Guid.NewGuid().ToString();
409+
var consumerRef = "myRef";
410+
var system = await StreamSystem.Create(new StreamSystemConfig());
411+
await system.CreateStream(new StreamSpec(stream));
412+
await system.StoreOffset(consumerRef, stream, 4);
413+
Assert.Equal((ulong)4, await system.QueryOffset(consumerRef, stream));
414+
await system.DeleteStream(stream);
415+
await system.Close();
416+
}
404417
}
405418
}

0 commit comments

Comments
 (0)