Skip to content

Commit 157cd79

Browse files
authored
Add GetAwaiter().GetResult(); on MessageHandler (#184)
* Change AwaitAsync to GetAwaiter().GetResult() This should actually block until this handler executes. Otherwise calling a few handlers eats up the whole threadpool Per conversation with @ricsiLT Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 305ba21 commit 157cd79

File tree

3 files changed

+8
-8
lines changed

3 files changed

+8
-8
lines changed

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ void DispatchMessage(ref SequenceReader<byte> sequenceReader, ulong i)
137137
{
138138
_config.MessageHandler(this,
139139
new MessageContext(message.MessageOffset, TimeSpan.FromMilliseconds(chunk.Timestamp)),
140-
message);
140+
message).GetAwaiter().GetResult();
141141
}
142142
}
143143
catch (Exception e)

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public record ConsumerConfig : ReliableConfig
3737
// See also: https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams
3838
// Parameters:
3939
public bool IsSuperStream { get; set; }
40-
40+
4141
// <summary>
4242
// The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:
4343
// - OffsetTypeFirst: starting from the first available offset. If the stream has not been truncated, this means the beginning of the stream (offset 0).
@@ -53,7 +53,7 @@ public record ConsumerConfig : ReliableConfig
5353
// The other instances will be idle.
5454
// </summary>
5555
public bool IsSingleActiveConsumer { get; set; } = false;
56-
56+
5757
// <summary>
5858
// The broker notifies a consumer that becomes active before dispatching messages to it.
5959
// With ConsumerUpdateListener the consumer can decide where to start consuming from.

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,11 @@ public async ValueTask Send(List<Message> messages, CompressionType compressionT
226226
}
227227
}
228228

229+
public override string ToString()
230+
{
231+
return $"Producer reference: {_producerConfig.Reference}, stream: {_producerConfig.Stream} ";
232+
}
233+
229234
/// <summary>
230235
/// Send the messages in batch to the stream in synchronous mode.
231236
/// The aggregation is provided by the user.
@@ -270,9 +275,4 @@ public async ValueTask Send(List<Message> messages)
270275
SemaphoreSlim.Release();
271276
}
272277
}
273-
274-
public override string ToString()
275-
{
276-
return $"Producer reference: {_producerConfig.Reference}, stream: {_producerConfig.Stream} ";
277-
}
278278
}

0 commit comments

Comments
 (0)