Skip to content

Commit c1b9961

Browse files
Run the Consumer MessageHandler in a Task (#250)
* Make the message handler async, avoiding blocking the socket thread. * reduce the initial credits to two * add the documentation * Use `Random.Shared` * Reduce the noise log when the consumer is removed --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent f966294 commit c1b9961

File tree

15 files changed

+325
-111
lines changed

15 files changed

+325
-111
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,8 @@ private async Task OnConnectionClosed(string reason)
198198
public static async Task<Client> Create(ClientParameters parameters, ILogger logger = null)
199199
{
200200
var client = new Client(parameters, logger);
201-
202201
client.connection = await Connection
203-
.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl)
202+
.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger)
204203
.ConfigureAwait(false);
205204

206205
// exchange properties
@@ -426,10 +425,11 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
426425
.ConfigureAwait(false);
427426
if (off == null)
428427
{
429-
_logger.LogWarning(
428+
_logger?.LogWarning(
430429
"ConsumerUpdateHandler can't returned null, a default offsetType (OffsetTypeNext) will be used");
431430
off = new OffsetTypeNext();
432431
}
432+
433433
// event the consumer is not active, we need to send a ConsumerUpdateResponse
434434
// by protocol definition. the offsetType can't be null so we use OffsetTypeNext as default
435435
await ConsumerUpdateResponse(

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using System.Net.Sockets;
1111
using System.Threading;
1212
using System.Threading.Tasks;
13+
using Microsoft.Extensions.Logging;
1314

1415
namespace RabbitMQ.Stream.Client
1516
{
@@ -25,6 +26,7 @@ public class Connection : IDisposable
2526
private int numFrames;
2627
private bool isClosed = false;
2728
private bool _disposedValue;
29+
private readonly ILogger _logger;
2830

2931
internal int NumFrames => numFrames;
3032

@@ -36,8 +38,9 @@ private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, Ssl
3638
}
3739

3840
private Connection(Socket socket, Func<Memory<byte>, Task> callback,
39-
Func<string, Task> closedCallBack, SslOption sslOption)
41+
Func<string, Task> closedCallBack, SslOption sslOption, ILogger logger)
4042
{
43+
_logger = logger;
4144
this.socket = socket;
4245
commandCallback = callback;
4346
closedCallback = closedCallBack;
@@ -51,7 +54,7 @@ private Connection(Socket socket, Func<Memory<byte>, Task> callback,
5154
}
5255

5356
public static async Task<Connection> Create(EndPoint endpoint, Func<Memory<byte>, Task> commandCallback,
54-
Func<string, Task> closedCallBack, SslOption sslOption)
57+
Func<string, Task> closedCallBack, SslOption sslOption, ILogger logger)
5558
{
5659
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
5760
socket.NoDelay = true;
@@ -80,7 +83,7 @@ public static async Task<Connection> Create(EndPoint endpoint, Func<Memory<byte>
8083
}
8184
}
8285

83-
return new Connection(socket, commandCallback, closedCallBack, sslOption);
86+
return new Connection(socket, commandCallback, closedCallBack, sslOption, logger);
8487
}
8588

8689
public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand
@@ -95,6 +98,10 @@ public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand
9598

9699
private async Task WriteCommand<T>(T command) where T : struct, ICommand
97100
{
101+
if (isClosed)
102+
{
103+
throw new InvalidOperationException("Connection is closed");
104+
}
98105
// Only one thread should be able to write to the output pipeline at a time.
99106
await _writeLock.WaitAsync().ConfigureAwait(false);
100107
try
@@ -149,23 +156,36 @@ private async Task ProcessIncomingFrames()
149156
reader.AdvanceTo(buffer.Start, buffer.End);
150157
}
151158
}
159+
catch (OperationCanceledException e)
160+
{
161+
caught = e;
162+
// We don't need to log as error this exception
163+
// It is raised when the socket is closed due of cancellation token
164+
// from the producer or consumer class
165+
// we leave it as debug to avoid noise in the logs
166+
_logger?.LogDebug("Operation Canceled Exception. TCP Connection Closed");
167+
}
152168
catch (Exception e)
153169
{
154170
caught = e;
155171
// The exception is needed mostly to raise the
156172
// closedCallback event.
157173
// It is useful to trace the error, but at this point
158174
// the socket is closed maybe not in the correct way
159-
Debug.WriteLine($"Error reading the socket, error: {e}");
175+
if (!isClosed)
176+
{
177+
_logger?.LogError(e, "Error reading the socket");
178+
}
160179
}
161180
finally
162181
{
163182
isClosed = true;
164183
// Mark the PipeReader as complete
165184
await reader.CompleteAsync(caught).ConfigureAwait(false);
166185
var t = closedCallback?.Invoke("TCP Connection Closed")!;
167-
await t.ConfigureAwait(false);
168-
Debug.WriteLine("TCP Connection Closed");
186+
if (t != null)
187+
await t.ConfigureAwait(false);
188+
_logger?.LogDebug("TCP Connection Closed");
169189
}
170190
}
171191

@@ -204,7 +224,7 @@ public void Dispose()
204224
socket.Dispose();
205225
if (!_incomingFramesTask.Wait(Consts.ShortWait))
206226
{
207-
Debug.WriteLine($"frame reader task did not exit in {Consts.ShortWait}");
227+
_logger?.LogError("frame reader task did not exit in {ShortWait}", Consts.ShortWait);
208228
}
209229
}
210230
finally

RabbitMQ.Stream.Client/Consts.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,15 @@ internal static class Consts
1414
internal static readonly TimeSpan ShortWait = TimeSpan.FromSeconds(1);
1515
internal static readonly TimeSpan MidWait = TimeSpan.FromSeconds(3);
1616
internal static readonly TimeSpan LongWait = TimeSpan.FromSeconds(10);
17+
18+
internal static int RandomShort()
19+
{
20+
return Random.Shared.Next(500, 1500);
21+
}
22+
23+
internal static int RandomMid()
24+
{
25+
return Random.Shared.Next(1000, 2500);
26+
}
1727
}
1828
}

RabbitMQ.Stream.Client/Deliver.cs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ internal readonly struct SubEntryChunk
5151
private SubEntryChunk(byte compress,
5252
ushort numRecordsInBatch,
5353
uint unCompressedDataSize, uint dataLen,
54-
ReadOnlySequence<byte> data)
54+
Memory<byte> data)
5555
{
5656
compressValue = compress;
5757
NumRecordsInBatch = numRecordsInBatch;
@@ -67,7 +67,18 @@ private SubEntryChunk(byte compress,
6767
public uint UnCompressedDataSize { get; }
6868

6969
public uint DataLen { get; }
70-
public ReadOnlySequence<byte> Data { get; }
70+
public Memory<byte> Data { get; }
71+
72+
// This wrapper was added to be used in async methods
73+
// where the SequenceReader is not available
74+
// see RawConsumer:ParseChunk for more details
75+
// at some point we could remove this wrapper
76+
// and use system.io.pipeline instead of SequenceReader
77+
internal static int Read(ref ReadOnlySequence<byte> seq, byte entryType, out SubEntryChunk subEntryChunk)
78+
{
79+
var reader = new SequenceReader<byte>(seq);
80+
return Read(ref reader, entryType, out subEntryChunk);
81+
}
7182

7283
internal static int Read(ref SequenceReader<byte> reader, byte entryType, out SubEntryChunk subEntryChunk)
7384
{
@@ -77,14 +88,18 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
7788
// Determinate what kind of the compression it is using
7889
// See Compress:CompressMode
7990
var compress = (byte)((byte)(entryType & 0x70) >> 4);
80-
offset++;
8191

8292
// Data contains the subEntryChunk information
8393
// We need to pass it to the subEntryChunk that will decode the information
94+
var memory =
95+
ArrayPool<byte>.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen);
8496
var data = reader.Sequence.Slice(reader.Consumed, dataLen);
97+
data.CopyTo(memory.Span);
98+
8599
subEntryChunk =
86-
new SubEntryChunk(compress, numRecordsInBatch, unCompressedDataSize, dataLen, data);
87-
offset += (int)dataLen;
100+
new SubEntryChunk(compress, numRecordsInBatch, unCompressedDataSize, dataLen, memory);
101+
offset += memory.Length;
102+
88103
// Here we need to advance the reader to the datalen
89104
// Since Data is passed to the subEntryChunk.
90105
reader.Advance(dataLen);
@@ -101,7 +116,7 @@ private Chunk(byte magicVersion,
101116
ulong epoch,
102117
ulong chunkId,
103118
int crc,
104-
ReadOnlySequence<byte> data)
119+
Memory<byte> data)
105120
{
106121
MagicVersion = magicVersion;
107122
NumEntries = numEntries;
@@ -121,7 +136,7 @@ private Chunk(byte magicVersion,
121136
public ulong Epoch { get; }
122137
public ulong ChunkId { get; }
123138
public int Crc { get; }
124-
public ReadOnlySequence<byte> Data { get; }
139+
public Memory<byte> Data { get; }
125140

126141
internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
127142
{
@@ -138,9 +153,11 @@ internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
138153
offset += WireFormatting.ReadUInt32(ref reader, out _);
139154
// offset += 4; // reserved
140155
offset += WireFormatting.ReadUInt32(ref reader, out _); // reserved
156+
var memory =
157+
ArrayPool<byte>.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen);
141158
var data = reader.Sequence.Slice(reader.Consumed, dataLen);
142-
offset += (int)dataLen;
143-
chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, data);
159+
data.CopyTo(memory.Span);
160+
chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory);
144161
return offset;
145162
}
146163
}

RabbitMQ.Stream.Client/Message.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,18 @@ public ReadOnlySequence<byte> Serialize()
7171
return new ReadOnlySequence<byte>(data);
7272
}
7373

74+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
75+
// This wrapper was added to be used in async methods
76+
// where the SequenceReader is not available
77+
// see RawConsumer:ParseChunk for more details
78+
// at some point we could remove this wrapper
79+
// and use system.io.pipeline instead of SequenceReader
80+
public static Message From(ref ReadOnlySequence<byte> seq, uint len)
81+
{
82+
var reader = new SequenceReader<byte>(seq);
83+
return From(ref reader, len);
84+
}
85+
7486
[MethodImpl(MethodImplOptions.AggressiveInlining)]
7587
public static Message From(ref SequenceReader<byte> reader, uint len)
7688
{

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ RabbitMQ.Stream.Client.Chunk
159159
RabbitMQ.Stream.Client.Chunk.Chunk() -> void
160160
RabbitMQ.Stream.Client.Chunk.ChunkId.get -> ulong
161161
RabbitMQ.Stream.Client.Chunk.Crc.get -> int
162-
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Buffers.ReadOnlySequence<byte>
163162
RabbitMQ.Stream.Client.Chunk.Epoch.get -> ulong
164163
RabbitMQ.Stream.Client.Chunk.NumEntries.get -> ushort
165164
RabbitMQ.Stream.Client.Chunk.NumRecords.get -> uint
@@ -803,7 +802,6 @@ static RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.Write(System.Span<byte> s
803802
static RabbitMQ.Stream.Client.Client.Create(RabbitMQ.Stream.Client.ClientParameters parameters, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Client>
804803
static RabbitMQ.Stream.Client.CompressionHelper.Compress(System.Collections.Generic.List<RabbitMQ.Stream.Client.Message> messages, RabbitMQ.Stream.Client.CompressionType compressionType) -> RabbitMQ.Stream.Client.ICompressionCodec
805804
static RabbitMQ.Stream.Client.CompressionHelper.UnCompress(RabbitMQ.Stream.Client.CompressionType compressionType, System.Buffers.ReadOnlySequence<byte> source, uint dataLen, uint unCompressedDataSize) -> System.Buffers.ReadOnlySequence<byte>
806-
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
807805
static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.Client.LeaderLocator
808806
static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator
809807
static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
22
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
33
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
4+
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
45
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
56
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
67
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
@@ -24,4 +25,6 @@ RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void
2425
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary<string, long> statistic) -> void
2526
RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
2627
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
27-
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
28+
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
29+
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
30+
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>

0 commit comments

Comments
 (0)