Skip to content

Commit e458aff

Browse files
AMQP 1.0 Parser Calculate correct maps size add Symbol (#211)
* Correct map size Signed-off-by: Gabriele Santomaggio <[email protected]> * Corret field numbers Signed-off-by: Gabriele Santomaggio <[email protected]> * Add AMQP simbol class Signed-off-by: Gabriele Santomaggio <[email protected]> * Add AMQP client Signed-off-by: Gabriele Santomaggio <[email protected]> * Add AMQP091 test Signed-off-by: Gabriele Santomaggio <[email protected]> * Add AMQP091 test Signed-off-by: Gabriele Santomaggio <[email protected]> * Add validation for message 1_0_0 Signed-off-by: Gabriele Santomaggio <[email protected]> * Add validation for message 1_0_0 Signed-off-by: Gabriele Santomaggio <[email protected]> * Add more AMQP tests Signed-off-by: Gabriele Santomaggio <[email protected]> * Rename class * Make Symbol internal Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent f6efcc8 commit e458aff

File tree

8 files changed

+331
-5
lines changed

8 files changed

+331
-5
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,7 @@ projects/Unit*/TestResult.xml
119119
# Vim
120120
.sw?
121121
.*.sw?
122+
123+
#tests
124+
Tests/coverage.*
125+

RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,31 @@ internal static int WriteAny(Span<byte> seq, object value)
3030
bool bo => WriteBool(seq, bo),
3131
byte[] bArr => bArr.Length == 0 ? WriteNull(seq) : WriteBytes(seq, bArr),
3232
DateTime d => d == DateTime.MinValue ? WriteNull(seq) : WriteTimestamp(seq, d),
33+
Symbol s => s.IsNull ? WriteNull(seq) : WriteSymbol(seq, s),
3334
_ => throw new AmqpParseException($"WriteAny Invalid type {value}")
3435
};
3536
}
3637

38+
private static int WriteSymbol(Span<byte> seq, Symbol symbol)
39+
{
40+
var len = s_encoding.GetByteCount(symbol.Value);
41+
var offset = 0;
42+
// Sym8
43+
if (len <= byte.MaxValue)
44+
{
45+
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Sym8);
46+
offset += WireFormatting.WriteByte(seq[offset..], (byte)len);
47+
offset += s_encoding.GetBytes(symbol.Value, seq[offset..]);
48+
return offset;
49+
}
50+
51+
// Sym32
52+
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Sym32);
53+
offset += WireFormatting.WriteInt32(seq[offset..], len);
54+
offset += s_encoding.GetBytes(symbol.Value, seq[offset..]);
55+
return offset;
56+
}
57+
3758
private static int WriteString(Span<byte> seq, string value)
3859
{
3960
var len = s_encoding.GetByteCount(value);
@@ -232,7 +253,7 @@ private static int GetStringSize(string value)
232253
{
233254
0 => 1, // 0x40
234255
<= byte.MaxValue => len + 1 + //marker 1 byte FormatCode.Vbin8
235-
1,
256+
1,
236257
_ => len + 1 //marker 1 byte FormatCode.Vbin32
237258
+ 4
238259
};

RabbitMQ.Stream.Client/AMQP/Map.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public int Write(Span<byte> span)
7373
{
7474
var offset = DescribedFormatCode.Write(span, MapDataCode);
7575
offset += WireFormatting.WriteByte(span[offset..], FormatCode.Map32);
76-
offset += WireFormatting.WriteUInt32(span[offset..], (uint)MapSize()); // MapSize
76+
offset += WireFormatting.WriteUInt32(span[offset..],
77+
(uint)MapSize() + DescribedFormatCode.Size + sizeof(byte)); // MapSize + DescribedFormatCode + FormatCode
7778
offset += WireFormatting.WriteUInt32(span[offset..], (uint)Count * 2); // pair values
7879
foreach (var (key, value) in this)
7980
{

RabbitMQ.Stream.Client/AMQP/Properties.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,18 @@ public int Write(Span<byte> span)
133133
{
134134
var offset = DescribedFormatCode.Write(span, DescribedFormatCode.MessageProperties);
135135
offset += WireFormatting.WriteByte(span[offset..], FormatCode.List32);
136-
offset += WireFormatting.WriteUInt32(span[offset..], (uint)PropertySize()); // PropertySize
136+
offset += WireFormatting.WriteUInt32(span[offset..],
137+
(uint)PropertySize() + DescribedFormatCode.Size +
138+
sizeof(byte)); // PropertySize + DescribedFormatCode.Size + sizeof(FormatCode.List32)
137139
offset += WireFormatting.WriteUInt32(span[offset..], 13); // field numbers
138140
offset += AmqpWireFormatting.WriteAny(span[offset..], MessageId);
139141
offset += AmqpWireFormatting.WriteAny(span[offset..], UserId);
140142
offset += AmqpWireFormatting.WriteAny(span[offset..], To);
141143
offset += AmqpWireFormatting.WriteAny(span[offset..], Subject);
142144
offset += AmqpWireFormatting.WriteAny(span[offset..], ReplyTo);
143145
offset += AmqpWireFormatting.WriteAny(span[offset..], CorrelationId);
144-
offset += AmqpWireFormatting.WriteAny(span[offset..], ContentType);
145-
offset += AmqpWireFormatting.WriteAny(span[offset..], ContentEncoding);
146+
offset += AmqpWireFormatting.WriteAny(span[offset..], new Symbol(ContentType));
147+
offset += AmqpWireFormatting.WriteAny(span[offset..], new Symbol(ContentEncoding));
146148
offset += AmqpWireFormatting.WriteAny(span[offset..], AbsoluteExpiryTime);
147149
offset += AmqpWireFormatting.WriteAny(span[offset..], CreationTime);
148150
offset += AmqpWireFormatting.WriteAny(span[offset..], GroupId);

RabbitMQ.Stream.Client/AMQP/Symbol.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
namespace RabbitMQ.Stream.Client.AMQP;
6+
7+
internal class Symbol
8+
{
9+
private readonly string _value;
10+
11+
public Symbol(string value)
12+
{
13+
_value = value;
14+
}
15+
16+
public string Value
17+
{
18+
get
19+
{
20+
return _value;
21+
}
22+
}
23+
24+
public bool IsNull
25+
{
26+
get
27+
{
28+
return string.IsNullOrWhiteSpace(_value);
29+
}
30+
}
31+
32+
public override string ToString()
33+
{
34+
return Value;
35+
}
36+
}

Tests/FromToAMQPTests.cs

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
using System;
6+
using System.Buffers;
7+
using System.Collections.Generic;
8+
using System.Text;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using RabbitMQ.Client;
12+
using RabbitMQ.Client.Events;
13+
using RabbitMQ.Stream.Client;
14+
using RabbitMQ.Stream.Client.AMQP;
15+
using RabbitMQ.Stream.Client.Reliable;
16+
using Xunit.Abstractions;
17+
18+
namespace Tests;
19+
20+
using Xunit;
21+
22+
public class FromToAmqpTests
23+
{
24+
private readonly ITestOutputHelper _testOutputHelper;
25+
26+
public FromToAmqpTests(ITestOutputHelper testOutputHelper)
27+
{
28+
_testOutputHelper = testOutputHelper;
29+
}
30+
31+
/// <summary>
32+
/// This test is to ensure that the conversion from AMQP to Stream AMQP 1.0 is correct.
33+
/// Stream sends the message and AMQP client reads it.
34+
/// In this case the server decodes anc converts the message
35+
/// </summary>
36+
[Fact]
37+
public async void Amqp091ShouldReadTheAmqp10Properties()
38+
{
39+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
40+
var producer = await Producer.Create(new ProducerConfig(system, stream));
41+
await producer.Send(new Message(Encoding.ASCII.GetBytes("FromStream"))
42+
{
43+
Properties = new Properties()
44+
{
45+
MessageId = "年 6 月",
46+
CorrelationId = "10000_00000",
47+
ContentType = "text/plain",
48+
ContentEncoding = "utf-8",
49+
UserId = Encoding.ASCII.GetBytes("MY_USER_ID"),
50+
GroupSequence = 601,
51+
ReplyToGroupId = "ReplyToGroupId",
52+
GroupId = "GroupId",
53+
},
54+
ApplicationProperties = new ApplicationProperties() { { "stream_key", "stream_value" } }
55+
});
56+
57+
var factory = new ConnectionFactory();
58+
using var connection = factory.CreateConnection();
59+
var channel = connection.CreateModel();
60+
var consumer = new EventingBasicConsumer(channel);
61+
var tcs = new TaskCompletionSource<BasicDeliverEventArgs>();
62+
consumer.Received += (sender, ea) =>
63+
{
64+
tcs.SetResult(ea);
65+
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
66+
};
67+
channel.BasicQos(0, 100, false);
68+
channel.BasicConsume(stream, false, "consumerTag",
69+
arguments: new Dictionary<string, object>() { { "x-stream-offset", "first" } }, consumer);
70+
var result = tcs.Task.Result;
71+
Assert.Equal("FromStream", Encoding.ASCII.GetString(result.Body.ToArray()));
72+
Assert.Equal("年 6 月", result.BasicProperties.MessageId);
73+
Assert.Equal("10000_00000", result.BasicProperties.CorrelationId);
74+
Assert.Equal("text/plain", result.BasicProperties.ContentType);
75+
Assert.Equal("utf-8", result.BasicProperties.ContentEncoding);
76+
Assert.Equal("MY_USER_ID", result.BasicProperties.UserId);
77+
Assert.Equal("stream_value",
78+
Encoding.ASCII.GetString(result.BasicProperties.Headers["stream_key"] as byte[] ?? Array.Empty<byte>()));
79+
channel.QueueDelete(stream);
80+
channel.Close();
81+
}
82+
83+
/// <summary>
84+
/// This test is to ensure that the conversion from AMQP 091 to Stream AMQP 1.0 is correct.
85+
/// AMQP sends the message and Stream has to read it.
86+
/// In this case the server decodes anc converts the message
87+
/// </summary>
88+
[Fact]
89+
public async void Amqp10ShouldReadTheAmqp019Properties()
90+
{
91+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
92+
93+
var factory = new ConnectionFactory();
94+
using var connection = factory.CreateConnection();
95+
var channel = connection.CreateModel();
96+
var properties = channel.CreateBasicProperties();
97+
98+
properties.MessageId = "年 6 月";
99+
properties.CorrelationId = "10000_00000";
100+
properties.ContentType = "text/plain";
101+
properties.ContentEncoding = "utf-8";
102+
properties.UserId = "guest";
103+
properties.Headers = new Dictionary<string, object>()
104+
{
105+
{"stream_key", "stream_value"},
106+
{"stream_key4", "Alan Mathison Turing(1912 年 6 月 23 日"},
107+
};
108+
channel.BasicPublish("", stream, properties, Encoding.ASCII.GetBytes("FromAMQP"));
109+
var tcs = new TaskCompletionSource<Message>();
110+
111+
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
112+
{
113+
OffsetSpec = new OffsetTypeFirst(),
114+
MessageHandler = async (_, _, _, message) =>
115+
{
116+
tcs.SetResult(message);
117+
await Task.CompletedTask;
118+
}
119+
});
120+
121+
new Utils<Message>(_testOutputHelper).WaitUntilTaskCompletes(tcs);
122+
var result = tcs.Task.Result;
123+
Assert.Equal("FromAMQP", Encoding.ASCII.GetString(result.Data.Contents.ToArray()));
124+
Assert.Equal("年 6 月", result.Properties.MessageId);
125+
Assert.Equal("10000_00000", result.Properties.CorrelationId);
126+
Assert.Equal("text/plain", result.Properties.ContentType);
127+
Assert.Equal("utf-8", result.Properties.ContentEncoding);
128+
Assert.Equal(Encoding.Default.GetBytes("guest"), result.Properties.UserId);
129+
Assert.Equal("stream_value", result.ApplicationProperties["stream_key"]);
130+
Assert.Equal("Alan Mathison Turing(1912 年 6 月 23 日", result.ApplicationProperties["stream_key4"]);
131+
await consumer.Close();
132+
await system.DeleteStream(stream);
133+
await system.Close();
134+
}
135+
136+
/// <summary>
137+
/// The file message_from_version_1_0_0 was generated with the 1.0.0 version of the client
138+
/// due of this issue https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/211 the write is changed
139+
/// but the read must be compatible
140+
/// </summary>
141+
[Fact]
142+
public void DecodeMessageFrom100Version()
143+
{
144+
var data = SystemUtils.GetFileContent("message_from_version_1_0_0");
145+
var reader = new SequenceReader<byte>(new ReadOnlySequence<byte>(data));
146+
var msg = Message.From(ref reader, (uint)reader.Length);
147+
Assert.Equal("Message100", Encoding.ASCII.GetString(msg.Data.Contents.ToArray()));
148+
Assert.Equal("MyMessageId", msg.Properties.MessageId);
149+
Assert.Equal("MyCorrelationId", msg.Properties.CorrelationId);
150+
Assert.Equal("text/plain", msg.Properties.ContentType);
151+
Assert.Equal("utf-8", msg.Properties.ContentEncoding);
152+
Assert.Equal("guest", Encoding.UTF8.GetString(msg.Properties.UserId));
153+
Assert.Equal((uint)9999, msg.Properties.GroupSequence);
154+
Assert.Equal("MyReplyToGroupId", msg.Properties.ReplyToGroupId);
155+
Assert.Equal("value", msg.ApplicationProperties["key_string"]);
156+
Assert.Equal(1111, msg.ApplicationProperties["key2_int"]);
157+
Assert.Equal(10_000_000_000, msg.ApplicationProperties["key2_decimal"]);
158+
Assert.Equal(true, msg.ApplicationProperties["key2_bool"]);
159+
}
160+
161+
[Fact]
162+
public async void Amqp091ShouldReadTheAmqp10Properties1000Messages()
163+
{
164+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
165+
var producer = await Producer.Create(new ProducerConfig(system, stream));
166+
const int NumberOfMessages = 1000;
167+
for (var i = 0; i < NumberOfMessages; i++)
168+
{
169+
await producer.Send(new Message(Encoding.ASCII.GetBytes($"FromStream{i}"))
170+
{
171+
Properties = new Properties()
172+
{
173+
MessageId = $"Alan Mathison Turing(1912 年 6 月 23 日 - 1954 年 6 月 7 日)是英国数学家、计算机科学家、逻辑学家、密码分析家、哲学家和理论生物学家。 [6] 图灵在理论计算机科学的发展中具有很大的影响力,用图灵机提供了算法和计算概念的形式化,可以被认为是通用计算机的模型。[7][8][9] 他被广泛认为是理论计算机科学和人工智能之父{i}",
174+
CorrelationId = "10000_00000",
175+
ContentType = "text/plain",
176+
ContentEncoding = "utf-8",
177+
UserId = Encoding.ASCII.GetBytes("MY_USER_ID"),
178+
GroupSequence = 601,
179+
ReplyToGroupId = "ReplyToGroupId",
180+
GroupId = "GroupId",
181+
182+
},
183+
ApplicationProperties = new ApplicationProperties()
184+
{
185+
{"stream_key", "stream_value"},
186+
{"stream_key2", 100},
187+
{"stream_key3", 10_000_009},
188+
{"stream_key4", "Alan Mathison Turing(1912 年 6 月 23 日"},
189+
}
190+
});
191+
}
192+
193+
var factory = new ConnectionFactory();
194+
using var connection = factory.CreateConnection();
195+
var channel = connection.CreateModel();
196+
var consumer = new EventingBasicConsumer(channel);
197+
var consumed = 0;
198+
var tcs = new TaskCompletionSource<int>();
199+
consumer.Received += (sender, ea) =>
200+
{
201+
if (Interlocked.Increment(ref consumed) == NumberOfMessages)
202+
{
203+
tcs.SetResult(consumed);
204+
}
205+
206+
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
207+
};
208+
channel.BasicQos(0, 100, false);
209+
channel.BasicConsume(stream, false, "consumerTag",
210+
arguments: new Dictionary<string, object>() { { "x-stream-offset", "first" } }, consumer);
211+
new Utils<int>(_testOutputHelper).WaitUntilTaskCompletes(tcs);
212+
Assert.Equal(NumberOfMessages, tcs.Task.Result);
213+
channel.QueueDelete(stream);
214+
channel.Close();
215+
}
216+
217+
[Fact]
218+
public async void Amqp10ShouldReadTheAmqp019Properties1000Messages()
219+
{
220+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
221+
222+
var factory = new ConnectionFactory();
223+
using var connection = factory.CreateConnection();
224+
var channel = connection.CreateModel();
225+
var properties = channel.CreateBasicProperties();
226+
const int NumberOfMessages = 1000;
227+
for (var i = 0; i < NumberOfMessages; i++)
228+
{
229+
properties.MessageId = $"messageId{i}";
230+
properties.CorrelationId = "10000_00000";
231+
properties.ContentType = "text/plain";
232+
properties.ContentEncoding = "utf-8";
233+
properties.UserId = "guest";
234+
properties.Headers = new Dictionary<string, object>() { { "stream_key", "stream_value" } };
235+
channel.BasicPublish("", stream, properties, Encoding.ASCII.GetBytes($"FromAMQP{i}"));
236+
}
237+
238+
var tcs = new TaskCompletionSource<int>();
239+
var consumed = 0;
240+
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
241+
{
242+
OffsetSpec = new OffsetTypeFirst(),
243+
MessageHandler = async (_, _, _, message) =>
244+
{
245+
if (Interlocked.Increment(ref consumed) == NumberOfMessages)
246+
{
247+
tcs.SetResult(consumed);
248+
}
249+
250+
await Task.CompletedTask;
251+
}
252+
});
253+
254+
new Utils<int>(_testOutputHelper).WaitUntilTaskCompletes(tcs);
255+
var result = tcs.Task.Result;
256+
Assert.Equal(NumberOfMessages, result);
257+
await consumer.Close();
258+
await system.DeleteStream(stream);
259+
await system.Close();
260+
}
261+
}
193 Bytes
Binary file not shown.

Tests/Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
<ItemGroup>
1010
<PackageReference Include="AltCover" Version="8.2.837" />
1111
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
12+
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
1213
<PackageReference Include="xunit" Version="2.4.1" />
1314
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
1415
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>

0 commit comments

Comments
 (0)