Skip to content

Commit 8ebd537

Browse files
authored
Implement the missing part of amqp 1.0 (#63)
Message Header AMQP value Currently the fields are in readonly mode.
1 parent 78e78cd commit 8ebd537

File tree

5 files changed

+99
-14
lines changed

5 files changed

+99
-14
lines changed

RabbitMQ.Stream.Client/AMQP/DescribedFormatCode.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ public static int Write(Span<byte> span, byte data)
2222
offset += WireFormatting.WriteByte(span.Slice(offset), data);
2323
return offset;
2424
}
25-
25+
2626
public const byte ApplicationData = 0x75;
2727
public const byte MessageAnnotations = 0x72;
2828
public const byte MessageProperties = 0x73;
2929
public const byte ApplicationProperties = 0x74;
30+
public const byte MessageHeader = 0x70;
31+
public const byte AmqpValue = 0x77;
3032
}
3133
}

RabbitMQ.Stream.Client/AMQP/Header.cs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System;
2+
using System.Buffers;
3+
4+
namespace RabbitMQ.Stream.Client.AMQP
5+
{
6+
public class Header
7+
{
8+
public bool Durable { get; internal set; }
9+
public byte Priority { get; internal set; }
10+
public uint Ttl { get; internal set; } // from milliseconds
11+
public bool FirstAcquirer { get; internal set; }
12+
public uint DeliveryCount { get; internal set; }
13+
14+
15+
public static Header Parse(ReadOnlySequence<byte> amqpData, ref int byteRead)
16+
{
17+
var offset = AmqpWireFormatting.ReadCompositeHeader(amqpData, out var fields, out var next);
18+
//TODO WIRE check the next
19+
var h = new Header();
20+
for (var index = 0; index < fields; index++)
21+
{
22+
offset += AmqpWireFormatting.TryReadNull(amqpData.Slice(offset), out var value);
23+
24+
if (!value)
25+
{
26+
switch (index)
27+
{
28+
case 0:
29+
offset += AmqpWireFormatting.ReadAny(amqpData.Slice(offset), out var durable);
30+
h.Durable = (bool) durable;
31+
break;
32+
case 1:
33+
offset += AmqpWireFormatting.ReadUByte(amqpData.Slice(offset), out var priority);
34+
h.Priority = priority;
35+
break;
36+
case 2:
37+
offset += AmqpWireFormatting.ReadAny(amqpData.Slice(offset), out var ttl);
38+
h.Ttl = (uint) ttl;
39+
break;
40+
case 3:
41+
offset += AmqpWireFormatting.ReadBool(amqpData.Slice(offset), out var firstAcquirer);
42+
h.FirstAcquirer = firstAcquirer;
43+
break;
44+
case 4:
45+
offset += AmqpWireFormatting.ReadUint32(amqpData.Slice(offset), out var deliveryCount);
46+
h.DeliveryCount = deliveryCount;
47+
break;
48+
}
49+
}
50+
}
51+
52+
53+
byteRead += offset;
54+
return h;
55+
}
56+
}
57+
}

RabbitMQ.Stream.Client/Message.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,16 @@ public Message(Data data)
2626
public ApplicationProperties ApplicationProperties { get; set; }
2727

2828
public Properties Properties { get; set; }
29+
2930
public Data Data { get; }
3031

32+
// MessageHeader and AmqpValue are only in get.
33+
// Just to have the compatibility with AMQP 1.0
34+
// In this specific case it is not needed
35+
public Header MessageHeader { get; internal set; }
36+
public object AmqpValue { get; internal set; }
37+
38+
3139
public int Size => Data.Size +
3240
(Properties?.Size ?? 0) +
3341
(Annotations?.Size ?? 0) +
@@ -86,8 +94,10 @@ public static Message From(ReadOnlySequence<byte> amqpData)
8694
//parse AMQP encoded data
8795
var offset = 0;
8896
Annotations annotations = null;
97+
Header header = null;
8998
Data data = default;
9099
Properties properties = null;
100+
object amqpValue = null;
91101
ApplicationProperties applicationProperties = null;
92102
while (offset != amqpData.Length)
93103
{
@@ -110,6 +120,13 @@ public static Message From(ReadOnlySequence<byte> amqpData)
110120
applicationProperties =
111121
ApplicationProperties.Parse<ApplicationProperties>(amqpData.Slice(offset), ref offset);
112122
break;
123+
case DescribedFormatCode.MessageHeader:
124+
header = Header.Parse(amqpData.Slice(offset), ref offset);
125+
break;
126+
case DescribedFormatCode.AmqpValue:
127+
offset += DescribedFormatCode.Size;
128+
offset += AmqpWireFormatting.ReadAny(amqpData.Slice(offset), out amqpValue);
129+
break;
113130
default:
114131
Console.WriteLine($"dataCode: {dataCode} not handled. Please open an issue.");
115132
throw new ArgumentOutOfRangeException($"dataCode: {dataCode} not handled");
@@ -120,7 +137,9 @@ public static Message From(ReadOnlySequence<byte> amqpData)
120137
{
121138
Annotations = annotations,
122139
Properties = properties,
123-
ApplicationProperties = applicationProperties
140+
ApplicationProperties = applicationProperties,
141+
AmqpValue = amqpValue,
142+
MessageHeader = header
124143
};
125144
return msg;
126145
}

Tests/Amqp10Tests.cs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,12 @@ public void ReadsThrowsExceptionInvalidType()
100100
{
101101
AmqpWireFormatting.ReadSByte(new ReadOnlySequence<byte>(data), out var value);
102102
});
103-
103+
104104
Assert.Throws<AmqpParseException>(() =>
105105
{
106106
AmqpWireFormatting.ReadBool(new ReadOnlySequence<byte>(data), out var value);
107107
});
108-
108+
109109
Assert.Throws<AmqpParseException>(() =>
110110
{
111111
AmqpWireFormatting.ReadInt16(new ReadOnlySequence<byte>(data), out var value);
@@ -189,13 +189,12 @@ public void ValidateFormatCode()
189189
var longValueBin64 = new byte[] {0x81, 0xff, 0xff, 0xff, 0xe6, 0x21, 0x42, 0xfe, 0x39};
190190
RunValidateFormatCode(longValue64, longValueBin64);
191191

192-
193-
const long longValue8 = 127;
192+
193+
const long longValue8 = 127;
194194
var longValueBin8 = new byte[] {0x55, 0x7F};
195195
RunValidateFormatCode(longValue8, longValueBin8);
196-
197-
198-
196+
197+
199198
const float floatValue = -88.88f;
200199
var floatValueBin = new byte[] {0x72, 0xc2, 0xb1, 0xc2, 0x8f};
201200
RunValidateFormatCode(floatValue, floatValueBin);
@@ -374,6 +373,19 @@ public void ValidateMessagesFromGo()
374373
Assert.Equal("test", msgStaticTest.Annotations["test"]);
375374
Assert.Equal((long) 1, msgStaticTest.Annotations[(long) 1]);
376375
Assert.Equal((long) 100_000, msgStaticTest.Annotations[(long) 100_000]);
376+
377+
378+
var header = SystemUtils.GetFileContent("header_amqpvalue_message");
379+
var msgHeader = Message.From(new ReadOnlySequence<byte>(header));
380+
Assert.NotNull(msgHeader);
381+
Assert.NotNull(msgHeader.MessageHeader);
382+
Assert.NotNull(msgHeader.AmqpValue);
383+
Assert.True(msgHeader.MessageHeader.Durable);
384+
Assert.True(msgHeader.MessageHeader.FirstAcquirer);
385+
Assert.Equal(100, msgHeader.MessageHeader.Priority);
386+
Assert.Equal((uint) 300, msgHeader.MessageHeader.DeliveryCount);
387+
Assert.True(msgHeader.MessageHeader.Ttl == 0);
388+
Assert.Equal("amqpValue", msgHeader.AmqpValue);
377389
}
378390

379391
[Fact]
@@ -397,11 +409,6 @@ public void MapEntriesWithAnEmptyKeyShouldNotBeWrittenToTheWire()
397409

398410
// we do not expect the new entry to be written
399411
Assert.Equal(expectedMapSize, actualMapSize);
400-
401-
402412
}
403413
}
404-
405-
406-
407414
}
47 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)