From 59024d69e96b1d6d3b6b7f228772151aa8108de4 Mon Sep 17 00:00:00 2001 From: Ulrik Born Date: Wed, 27 Nov 2024 14:35:14 +0100 Subject: [PATCH] Added new IConsumer.Consume overload taking target ConsumeResult as parameter for low-allow use cases. --- src/Confluent.Kafka/Consumer.cs | 63 ++++++++------ src/Confluent.Kafka/Header.cs | 16 +--- src/Confluent.Kafka/Headers.cs | 101 ++++++++-------------- src/Confluent.Kafka/IConsumer.cs | 18 +++- test/Confluent.Kafka.UnitTests/Headers.cs | 20 ++++- 5 files changed, 110 insertions(+), 108 deletions(-) diff --git a/src/Confluent.Kafka/Consumer.cs b/src/Confluent.Kafka/Consumer.cs index fae61ebed..541587773 100644 --- a/src/Confluent.Kafka/Consumer.cs +++ b/src/Confluent.Kafka/Consumer.cs @@ -764,8 +764,21 @@ internal Consumer(ConsumerBuilder builder) /// /// Refer to /// - public ConsumeResult Consume(int millisecondsTimeout) - { + public ConsumeResult Consume(int millisecondsTimeout) + { + ConsumeResult result = new(); + if (!Consume(millisecondsTimeout, result)) + return null; + return result; + } + + + /// + public bool Consume(int millisecondsTimeout, ConsumeResult result) + { + if (result == null) + throw new ArgumentNullException(nameof(result)); + var msgPtr = kafkaHandle.ConsumerPoll((IntPtr)millisecondsTimeout); if (this.handlerException != null) @@ -781,7 +794,7 @@ public ConsumeResult Consume(int millisecondsTimeout) if (msgPtr == IntPtr.Zero) { - return null; + return false; } try @@ -806,14 +819,12 @@ public ConsumeResult Consume(int millisecondsTimeout) if (msg.err == ErrorCode.Local_PartitionEOF) { - return new ConsumeResult - { - TopicPartitionOffset = new TopicPartitionOffset(topic, - msg.partition, msg.offset, - msgLeaderEpoch), - Message = null, - IsPartitionEOF = true - }; + result.IsPartitionEOF = true; + result.Topic = topic; + result.Partition = msg.partition; + result.Offset = msg.offset; + result.LeaderEpoch = msgLeaderEpoch; + return true; } long timestampUnix = 0; @@ -827,7 +838,9 @@ public ConsumeResult Consume(int millisecondsTimeout) Headers headers = null; if (enableHeaderMarshaling) { - headers = new Headers(); + headers = result.Message?.Headers ?? new Headers(); + headers.Clear(); + Librdkafka.message_headers(msgPtr, out IntPtr hdrsPtr); if (hdrsPtr != IntPtr.Zero) { @@ -938,20 +951,18 @@ public ConsumeResult Consume(int millisecondsTimeout) ex); } - return new ConsumeResult - { - TopicPartitionOffset = new TopicPartitionOffset(topic, - msg.partition, msg.offset, - msgLeaderEpoch), - Message = new Message - { - Timestamp = timestamp, - Headers = headers, - Key = key, - Value = val - }, - IsPartitionEOF = false - }; + result.Topic = topic; + result.Partition = msg.partition; + result.Offset = msg.offset; + result.LeaderEpoch = msgLeaderEpoch; + + result.Message ??= new Message(); + result.Message.Timestamp = timestamp; + result.Message.Headers = headers; + result.Message.Key = key; + result.Message.Value = val; + result.IsPartitionEOF = false; + return true; } finally { diff --git a/src/Confluent.Kafka/Header.cs b/src/Confluent.Kafka/Header.cs index 27f9e9ee1..0c053b460 100644 --- a/src/Confluent.Kafka/Header.cs +++ b/src/Confluent.Kafka/Header.cs @@ -27,20 +27,17 @@ namespace Confluent.Kafka /// public class Header : IHeader { - private byte[] val; + private readonly byte[] val; /// /// The header key. /// - public string Key { get; private set; } + public string Key { get; } /// /// Get the serialized header value data. /// - public byte[] GetValueBytes() - { - return val; - } + public byte[] GetValueBytes() => val; /// /// Create a new Header instance. @@ -53,12 +50,7 @@ public byte[] GetValueBytes() /// public Header(string key, byte[] value) { - if (key == null) - { - throw new ArgumentNullException("Kafka message header key cannot be null."); - } - - Key = key; + Key = key ?? throw new ArgumentNullException(nameof(key), "Kafka message header key cannot be null."); val = value; } } diff --git a/src/Confluent.Kafka/Headers.cs b/src/Confluent.Kafka/Headers.cs index 3fe66032b..8d5486056 100644 --- a/src/Confluent.Kafka/Headers.cs +++ b/src/Confluent.Kafka/Headers.cs @@ -17,6 +17,7 @@ using System; using System.Collections; using System.Collections.Generic; +using System.Linq; namespace Confluent.Kafka @@ -28,13 +29,16 @@ namespace Confluent.Kafka /// Message headers are supported by v0.11 brokers and above. /// public class Headers : IEnumerable - { - private readonly List headers = new List(); + { + /// + /// Backing list is only created on first actual header + /// + private List headers = null; /// /// Gets the underlying list of headers /// - public IReadOnlyList BackingList => headers; + public IReadOnlyList BackingList => (IReadOnlyList)headers ?? Array.Empty(); /// /// Append a new header to the collection. @@ -50,10 +54,9 @@ public class Headers : IEnumerable public void Add(string key, byte[] val) { if (key == null) - { - throw new ArgumentNullException("Kafka message header key cannot be null."); - } + throw new ArgumentNullException(nameof(key), "Kafka message header key cannot be null."); + headers ??= new(); headers.Add(new Header(key, val)); } @@ -64,8 +67,9 @@ public void Add(string key, byte[] val) /// The header to add to the collection. /// public void Add(Header header) - { - headers.Add(header); + { + headers ??= new(); + headers.Add(header); } /// @@ -107,16 +111,17 @@ public byte[] GetLastBytes(string key) /// public bool TryGetLastBytes(string key, out byte[] lastHeader) { - for (int i=headers.Count-1; i>=0; --i) - { - if (headers[i].Key == key) - { - lastHeader = headers[i].GetValueBytes(); - return true; - } - } - - lastHeader = default(byte[]); + if (headers != null) + for (int i = headers.Count - 1; i >= 0; --i) + { + if (headers[i].Key == key) + { + lastHeader = headers[i].GetValueBytes(); + return true; + } + } + + lastHeader = default; return false; } @@ -127,62 +132,28 @@ public bool TryGetLastBytes(string key, out byte[] lastHeader) /// /// The key to remove all headers for /// - public void Remove(string key) - => headers.RemoveAll(a => a.Key == key); - - internal class HeadersEnumerator : IEnumerator - { - private Headers headers; - - private int location = -1; - - public HeadersEnumerator(Headers headers) - { - this.headers = headers; - } - - public object Current - => ((IEnumerator)this).Current; - - IHeader IEnumerator.Current - => headers.headers[location]; - - public void Dispose() {} - - public bool MoveNext() - { - location += 1; - if (location >= headers.headers.Count) - { - return false; - } - - return true; - } - - public void Reset() - { - this.location = -1; - } - } - + public void Remove(string key) => headers?.RemoveAll(a => a.Key == key); + + /// + /// Removes all headers from the collection. + /// + public void Clear() => headers?.Clear(); + /// /// Returns an enumerator that iterates through the headers collection. /// /// /// An enumerator object that can be used to iterate through the headers collection. /// - public IEnumerator GetEnumerator() - => new HeadersEnumerator(this); - + public IEnumerator GetEnumerator() => headers?.GetEnumerator() ?? Enumerable.Empty().GetEnumerator(); + /// /// Returns an enumerator that iterates through the headers collection. /// /// /// An enumerator object that can be used to iterate through the headers collection. /// - IEnumerator IEnumerable.GetEnumerator() - => new HeadersEnumerator(this); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); /// /// Gets the header at the specified index @@ -190,13 +161,11 @@ IEnumerator IEnumerable.GetEnumerator() /// /// The zero-based index of the element to get. /// - public IHeader this[int index] - => headers[index]; + public IHeader this[int index] => headers?[index] ?? throw new ArgumentOutOfRangeException(nameof(index), "Header collection is empty."); /// /// The number of headers in the collection. /// - public int Count - => headers.Count; + public int Count => headers?.Count ?? 0; } } diff --git a/src/Confluent.Kafka/IConsumer.cs b/src/Confluent.Kafka/IConsumer.cs index 38c3a5a71..d5edfe49e 100644 --- a/src/Confluent.Kafka/IConsumer.cs +++ b/src/Confluent.Kafka/IConsumer.cs @@ -53,6 +53,18 @@ public interface IConsumer : IClient /// ConsumeResult Consume(int millisecondsTimeout); + /// + /// Poll for new messages / events. Blocks until a consume result is availble or until timeout period has elapsed. + /// + /// + /// This overload takes the result instance as parameter to allow reuse of result and contained message instances. + /// + /// + /// Mandatory result instance to be filled with next message/EOF. + /// True if result was filled with message or EOF, false if timeout elapsed. + bool Consume(int millisecondsTimeout, ConsumeResult result); + + /// /// Poll for new messages / events. Blocks /// until a consume result is available or the @@ -666,9 +678,9 @@ public interface IConsumer : IClient /// /// Thrown if the operation fails. /// - void Close(); - - + void Close(); + + /// /// The current consumer group metadata associated with this consumer, /// or null if a GroupId has not been specified for the consumer. diff --git a/test/Confluent.Kafka.UnitTests/Headers.cs b/test/Confluent.Kafka.UnitTests/Headers.cs index ddea2edab..b7fd1be81 100644 --- a/test/Confluent.Kafka.UnitTests/Headers.cs +++ b/test/Confluent.Kafka.UnitTests/Headers.cs @@ -82,7 +82,9 @@ public void TryGetLast() [Fact] public void TryGetLast_NotExist() { - var hdrs = new Headers(); + var hdrs = new Headers(); + Assert.False(hdrs.TryGetLastBytes("my-header-2", out byte[] _)); + hdrs.Add(new Header("my-header", new byte[] { 42 })); Assert.False(hdrs.TryGetLastBytes("my-header-2", out byte[] val)); @@ -107,6 +109,8 @@ public void NullValue() public void Remove() { var hdrs = new Headers(); + hdrs.Remove("not-present"); + hdrs.Add(new Header("my-header", new byte[] { 42 })); hdrs.Add(new Header("my-header", new byte[] { 44 })); hdrs.Add(new Header("my-header-2", new byte[] { 45 })); @@ -151,6 +155,9 @@ public void Count() public void Enumerator() { var hdrs = new Headers(); + + Assert.Empty(hdrs); + hdrs.Add(new Header("my-header", new byte[] { 42 })); hdrs.Add(new Header("my-header", new byte[] { 44 })); hdrs.Add(new Header("my-header-2", new byte[] { 45 })); @@ -176,5 +183,16 @@ public void Enumerator() Assert.Equal(3, cnt); } + [Fact] + public void BackingList() + { + var hdrs = new Headers(); + hdrs.Clear(); + Assert.Empty(hdrs.BackingList); + hdrs.Add("A", null); + Assert.Single(hdrs.BackingList); + hdrs.Clear(); + Assert.Empty(hdrs); + } } }