Skip to content

Commit aac7819

Browse files
Fix the read in the AMQP parser (#217)
* Fix the read in the AMQP parser - Change the string read: use the offset len coming from the protocol instead of encoding.GetByteCount(value). In some case the message properties can't be UFT8. - Use Encoding.ASCII.GetString in case of FormatCode.Sym8 that's more corret. - Add the AMQPlite dependency on the Test package to simulate the use case where the parser failed. - Add amqp 1.0 plugin to the CI - bump rabbitmq version on Windows CI Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent bb71c52 commit aac7819

File tree

6 files changed

+81
-12
lines changed

6 files changed

+81
-12
lines changed

.ci/install.ps1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,4 @@ Write-Host '[INFO] Getting RabbitMQ status...'
158158

159159
$ErrorActionPreference = 'Continue'
160160
Write-Host '[INFO] Enabling plugins...'
161-
& $rabbitmq_plugins_path enable rabbitmq_management rabbitmq_stream rabbitmq_stream_management
161+
& $rabbitmq_plugins_path enable rabbitmq_management rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0

.ci/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "25.2",
3-
"rabbitmq": "3.11.6"
3+
"rabbitmq": "3.11.7"
44
}

.github/workflows/build-test.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ jobs:
6868
restore-keys: |
6969
${{ runner.os }}-v2-nuget-
7070
- name: Enable RabbitMQ Plugins
71-
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
71+
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0
7272
- name: Restore
7373
run: dotnet restore --verbosity=normal
7474
- name: Build

RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,23 +169,34 @@ internal static int ReadString(ref SequenceReader<byte> reader, out string value
169169
var offset = ReadType(ref reader, out var type);
170170
switch (type)
171171
{
172-
case FormatCode.Str8:
173172
case FormatCode.Sym8:
173+
offset += WireFormatting.ReadByte(ref reader, out var lenAscii);
174+
Span<byte> tempSpanAscii = stackalloc byte[lenAscii];
175+
reader.TryCopyTo(tempSpanAscii);
176+
reader.Advance(lenAscii);
177+
value = Encoding.ASCII.GetString(tempSpanAscii);
178+
return offset + lenAscii;
179+
case FormatCode.Str8:
174180
offset += WireFormatting.ReadByte(ref reader, out var lenC);
175181
Span<byte> tempSpan = stackalloc byte[lenC];
176182
reader.TryCopyTo(tempSpan);
177183
reader.Advance(lenC);
178184
value = Encoding.UTF8.GetString(tempSpan);
179-
return offset + s_encoding.GetByteCount(value);
180-
185+
return offset + lenC;
181186
case FormatCode.Sym32:
187+
offset += WireFormatting.ReadInt32(ref reader, out var lenAscii32);
188+
var tempSpanAscii32 = lenAscii32 <= 64 ? stackalloc byte[lenAscii32] : new byte[lenAscii32];
189+
reader.TryCopyTo(tempSpanAscii32);
190+
reader.Advance(lenAscii32);
191+
value = Encoding.ASCII.GetString(tempSpanAscii32);
192+
return offset + lenAscii32;
182193
case FormatCode.Str32:
183194
offset += WireFormatting.ReadInt32(ref reader, out var len);
184195
var tempSpan32 = len <= 64 ? stackalloc byte[len] : new byte[len];
185196
reader.TryCopyTo(tempSpan32);
186197
reader.Advance(len);
187198
value = Encoding.UTF8.GetString(tempSpan32);
188-
return offset + s_encoding.GetByteCount(value);
199+
return offset + len;
189200
}
190201

191202
throw new AMQP.AmqpParseException($"ReadString invalid type {type}");

Tests/FromToAMQPTests.cs

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88
using System.Text;
99
using System.Threading;
1010
using System.Threading.Tasks;
11+
using Amqp;
1112
using RabbitMQ.Client;
1213
using RabbitMQ.Client.Events;
1314
using RabbitMQ.Stream.Client;
1415
using RabbitMQ.Stream.Client.AMQP;
1516
using RabbitMQ.Stream.Client.Reliable;
1617
using Xunit.Abstractions;
18+
using ConnectionFactory = RabbitMQ.Client.ConnectionFactory;
19+
using Message = RabbitMQ.Stream.Client.Message;
1720

1821
namespace Tests;
1922

@@ -102,8 +105,7 @@ public async void Amqp10ShouldReadTheAmqp019Properties()
102105
properties.UserId = "guest";
103106
properties.Headers = new Dictionary<string, object>()
104107
{
105-
{"stream_key", "stream_value"},
106-
{"stream_key4", "Alan Mathison Turing(1912 年 6 月 23 日"},
108+
{"stream_key", "stream_value"}, {"stream_key4", "Alan Mathison Turing(1912 年 6 月 23 日"},
107109
};
108110
channel.BasicPublish("", stream, properties, Encoding.ASCII.GetBytes("FromAMQP"));
109111
var tcs = new TaskCompletionSource<Message>();
@@ -170,20 +172,20 @@ await producer.Send(new Message(Encoding.ASCII.GetBytes($"FromStream{i}"))
170172
{
171173
Properties = new Properties()
172174
{
173-
MessageId = $"Alan Mathison Turing(1912 年 6 月 23 日 - 1954 年 6 月 7 日)是英国数学家、计算机科学家、逻辑学家、密码分析家、哲学家和理论生物学家。 [6] 图灵在理论计算机科学的发展中具有很大的影响力,用图灵机提供了算法和计算概念的形式化,可以被认为是通用计算机的模型。[7][8][9] 他被广泛认为是理论计算机科学和人工智能之父{i}",
175+
MessageId =
176+
$"Alan Mathison Turing(1912 年 6 月 23 日 - 1954 年 6 月 7 日)是英国数学家、计算机科学家、逻辑学家、密码分析家、哲学家和理论生物学家。 [6] 图灵在理论计算机科学的发展中具有很大的影响力,用图灵机提供了算法和计算概念的形式化,可以被认为是通用计算机的模型。[7][8][9] 他被广泛认为是理论计算机科学和人工智能之父{i}",
174177
CorrelationId = "10000_00000",
175178
ContentType = "text/plain",
176179
ContentEncoding = "utf-8",
177180
UserId = Encoding.ASCII.GetBytes("MY_USER_ID"),
178181
GroupSequence = 601,
179182
ReplyToGroupId = "ReplyToGroupId",
180183
GroupId = "GroupId",
181-
182184
},
183185
ApplicationProperties = new ApplicationProperties()
184186
{
185187
{"stream_key", "stream_value"},
186-
{"stream_key2", 100},
188+
{"stream_key2", 100},
187189
{"stream_key3", 10_000_009},
188190
{"stream_key4", "Alan Mathison Turing(1912 年 6 月 23 日"},
189191
}
@@ -258,4 +260,59 @@ public async void Amqp10ShouldReadTheAmqp019Properties1000Messages()
258260
await system.DeleteStream(stream);
259261
await system.Close();
260262
}
263+
264+
/// <summary>
265+
/// In this test se send 1 message using the Amqp10 Producer https://github.com/Azure/amqpnetlite to
266+
/// a stream and then we read it using.
267+
/// See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/217
268+
/// </summary>
269+
[Fact]
270+
public async void StreamShouldReadTheAmqp10PropertiesMessages()
271+
{
272+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
273+
274+
var address = new Address("amqp://guest:guest@localhost:5672");
275+
var connection = new Amqp.Connection(address);
276+
var session = new Session(connection);
277+
278+
var message = new Amqp.Message("msg from amqp 1.0");
279+
message.Properties = new Amqp.Framing.Properties()
280+
{
281+
MessageId = "1",
282+
Subject = "test",
283+
ContentType = "text/plain"
284+
};
285+
message.ApplicationProperties = new Amqp.Framing.ApplicationProperties()
286+
{
287+
Map = { { "key1", "value1" }, { "key2", 2 } }
288+
};
289+
290+
var sender = new SenderLink(session, "mixing", $"/amq/queue/{stream}");
291+
await sender.SendAsync(message);
292+
await sender.CloseAsync();
293+
await session.CloseAsync();
294+
await connection.CloseAsync();
295+
296+
var tcs = new TaskCompletionSource<Message>();
297+
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
298+
{
299+
OffsetSpec = new OffsetTypeFirst(),
300+
MessageHandler = async (_, _, _, streamMessage) =>
301+
{
302+
tcs.SetResult(streamMessage);
303+
await Task.CompletedTask;
304+
}
305+
});
306+
307+
new Utils<Message>(_testOutputHelper).WaitUntilTaskCompletes(tcs);
308+
var result = tcs.Task.Result;
309+
// Why do we need result.Data.Contents.ToArray()[5..]?
310+
// Because of https://github.com/rabbitmq/rabbitmq-server/issues/6937
311+
// When it will be fixed we can remove the [5..]
312+
// For the moment we leave it as it is because it is not a problem for the client
313+
Assert.Equal("msg from amqp 1.0", Encoding.UTF8.GetString(result.Data.Contents.ToArray()[5..]));
314+
await consumer.Close();
315+
await system.DeleteStream(stream);
316+
await system.Close();
317+
}
261318
}

Tests/Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
<ItemGroup>
1010
<PackageReference Include="AltCover" Version="8.2.837" />
11+
<PackageReference Include="AmqpNetLite" Version="2.4.5" />
1112
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
1213
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
1314
<PackageReference Include="xunit" Version="2.4.1" />

0 commit comments

Comments
 (0)