Skip to content

Commit 44b9b12

Browse files
committed
improve partition cache
1 parent b09bcdc commit 44b9b12

File tree

8 files changed

+763
-34
lines changed

8 files changed

+763
-34
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Rdkafka Changelog
22

33
## 0.21.1 (Unreleased)
4+
- [Enhancement] Replace TTL-based partition count cache with a global cache that reuses `librdkafka` statistics data when possible.
45
- [Enhancement] Support producing and consuming of headers with mulitple values (KIP-82).
56
- [Enhancement] Allow native Kafka customization poll time.
67
- [Enhancement] Roll out experimental jruby support.

lib/rdkafka.rb

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
require "rdkafka/error"
4343
require "rdkafka/metadata"
4444
require "rdkafka/native_kafka"
45+
require "rdkafka/producer/partitions_count_cache"
4546
require "rdkafka/producer"
4647
require "rdkafka/producer/delivery_handle"
4748
require "rdkafka/producer/delivery_report"

lib/rdkafka/bindings.rb

+25-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ def self.lib_extension
2626
RD_KAFKA_OFFSET_STORED = -1000
2727
RD_KAFKA_OFFSET_INVALID = -1001
2828

29+
EMPTY_HASH = {}.freeze
30+
2931
class SizePtr < FFI::Struct
3032
layout :value, :size_t
3133
end
@@ -201,9 +203,31 @@ class NativeErrorDesc < FFI::Struct
201203
StatsCallback = FFI::Function.new(
202204
:int, [:pointer, :string, :int, :pointer]
203205
) do |_client_ptr, json, _json_len, _opaque|
204-
# Pass the stats hash to callback in config
205206
if Rdkafka::Config.statistics_callback
206207
stats = JSON.parse(json)
208+
209+
# If user requested statistics callbacks, we can use the statistics data to get the
210+
# partitions count for each topic when this data is published. That way we do not have
211+
# to query this information when user is using `partition_key`. This takes around 0.02ms
212+
# every statistics interval period (most likely every 5 seconds) and saves us from making
213+
# any queries to the cluster for the partition count.
214+
#
215+
# One edge case is if user would set the `statistics.interval.ms` much higher than the
216+
# default current partition count refresh (30 seconds). This is taken care of as the lack
217+
# of reporting to the partitions cache will cause cache expire and blocking refresh.
218+
#
219+
# If user sets `topic.metadata.refresh.interval.ms` too high this is on the user.
220+
#
221+
# Since this cache is shared, having few consumers and/or producers in one process will
222+
# automatically improve the querying times even with low refresh times.
223+
(stats['topics'] || EMPTY_HASH).each do |topic_name, details|
224+
partitions_count = details['partitions'].keys.reject { |k| k == '-1' }.size
225+
226+
next unless partitions_count.positive?
227+
228+
Rdkafka::Producer.partitions_count_cache.set(topic_name, partitions_count)
229+
end
230+
207231
Rdkafka::Config.statistics_callback.call(stats)
208232
end
209233

lib/rdkafka/producer.rb

+35-30
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,31 @@ class Producer
66
include Helpers::Time
77
include Helpers::OAuth
88

9-
# Cache partitions count for 30 seconds
10-
PARTITIONS_COUNT_TTL = 30
11-
129
# Empty hash used as a default
1310
EMPTY_HASH = {}.freeze
1411

15-
private_constant :PARTITIONS_COUNT_TTL, :EMPTY_HASH
12+
# @private
13+
@@partitions_count_cache = PartitionsCountCache.new
14+
15+
# Global (process wide) partitions cache. We use it to store number of topics partitions,
16+
# either from the librdkafka statistics (if enabled) or via direct inline calls every now and
17+
# then. Since the partitions count can only grow and should be same for all consumers and
18+
# producers, we can use a global cache as long as we ensure that updates only move up.
19+
#
20+
# @note It is critical to remember, that not all users may have statistics callbacks enabled,
21+
# hence we should not make assumption that this cache is always updated from the stats.
22+
#
23+
# @return [Rdkafka::Producer::PartitionsCountCache]
24+
def self.partitions_count_cache
25+
@@partitions_count_cache
26+
end
27+
28+
# @param partitions_count_cache [Rdkafka::Producer::PartitionsCountCache]
29+
def self.partitions_count_cache=(partitions_count_cache)
30+
@@partitions_count_cache = partitions_count_cache
31+
end
32+
33+
private_constant :EMPTY_HASH
1634

1735
# Raised when there was a critical issue when invoking rd_kafka_topic_new
1836
# This is a temporary solution until https://github.com/karafka/rdkafka-ruby/issues/451 is
@@ -43,25 +61,6 @@ def initialize(native_kafka, partitioner_name)
4361

4462
# Makes sure, that native kafka gets closed before it gets GCed by Ruby
4563
ObjectSpace.define_finalizer(self, native_kafka.finalizer)
46-
47-
@_partitions_count_cache = Hash.new do |cache, topic|
48-
topic_metadata = nil
49-
50-
@native_kafka.with_inner do |inner|
51-
topic_metadata = ::Rdkafka::Metadata.new(inner, topic).topics&.first
52-
end
53-
54-
partition_count = topic_metadata ? topic_metadata[:partition_count] : -1
55-
56-
# This approach caches the failure to fetch only for 1 second. This will make sure, that
57-
# we do not cache the failure for too long but also "buys" us a bit of time in case there
58-
# would be issues in the cluster so we won't overaload it with consecutive requests
59-
cache[topic] = if partition_count.positive?
60-
[monotonic_now, partition_count]
61-
else
62-
[monotonic_now - PARTITIONS_COUNT_TTL + 5, partition_count]
63-
end
64-
end
6564
end
6665

6766
# Sets alternative set of configuration details that can be set per topic
@@ -222,18 +221,24 @@ def purge
222221
# @note If 'allow.auto.create.topics' is set to true in the broker, the topic will be
223222
# auto-created after returning nil.
224223
#
225-
# @note We cache the partition count for a given topic for given time.
224+
# @note We cache the partition count for a given topic for given time. If statistics are
225+
# enabled for any producer or consumer, it will take precedence over per instance fetching.
226+
#
226227
# This prevents us in case someone uses `partition_key` from querying for the count with
227-
# each message. Instead we query once every 30 seconds at most if we have a valid partition
228-
# count or every 5 seconds in case we were not able to obtain number of partitions
228+
# each message. Instead we query at most once every 30 seconds at most if we have a valid
229+
# partition count or every 5 seconds in case we were not able to obtain number of partitions.
229230
def partition_count(topic)
230231
closed_producer_check(__method__)
231232

232-
@_partitions_count_cache.delete_if do |_, cached|
233-
monotonic_now - cached.first > PARTITIONS_COUNT_TTL
234-
end
233+
self.class.partitions_count_cache.get(topic) do
234+
topic_metadata = nil
235+
236+
@native_kafka.with_inner do |inner|
237+
topic_metadata = ::Rdkafka::Metadata.new(inner, topic).topics&.first
238+
end
235239

236-
@_partitions_count_cache[topic].last
240+
topic_metadata ? topic_metadata[:partition_count] : -1
241+
end
237242
end
238243

239244
# Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Producer
5+
# Caching mechanism for Kafka topic partition counts to avoid frequent cluster queries
6+
#
7+
# This cache is designed to optimize the process of obtaining partition counts for topics.
8+
# It uses several strategies to minimize Kafka cluster queries:
9+
#
10+
# @note Design considerations:
11+
#
12+
# 1. Statistics-based updates
13+
# When statistics callbacks are enabled (via `statistics.interval.ms`), we leverage
14+
# this data to proactively update the partition counts cache. This approach costs
15+
# approximately 0.02ms of processing time during each statistics interval (typically
16+
# every 5 seconds) but eliminates the need for explicit blocking metadata queries.
17+
#
18+
# 2. Edge case handling
19+
# If a user configures `statistics.interval.ms` much higher than the default cache TTL
20+
# (30 seconds), the cache will still function correctly. When statistics updates don't
21+
# occur frequently enough, the cache entries will expire naturally, triggering a
22+
# blocking refresh when needed.
23+
#
24+
# 3. User configuration awareness
25+
# The cache respects user-defined settings. If `topic.metadata.refresh.interval.ms` is
26+
# set very high, the responsibility for potentially stale data falls on the user. This
27+
# is an explicit design choice to honor user configuration preferences and align with
28+
# librdkafka settings.
29+
#
30+
# 4. Process-wide efficiency
31+
# Since this cache is shared across all Rdkafka producers and consumers within a process,
32+
# having multiple clients improves overall efficiency. Each client contributes to keeping
33+
# the cache updated, benefiting all other clients.
34+
#
35+
# 5. Thread-safety approach
36+
# The implementation uses fine-grained locking with per-topic mutexes to minimize
37+
# contention in multi-threaded environments while ensuring data consistency.
38+
#
39+
# 6. Topic recreation handling
40+
# If a topic is deleted and recreated with fewer partitions, the cache will continue to
41+
# report the higher count until either the TTL expires or the process is restarted. This
42+
# design choice simplifies the implementation while relying on librdkafka's error handling
43+
# for edge cases. In production environments, topic recreation with different partition
44+
# counts is typically accompanied by application restarts to handle structural changes.
45+
# This also aligns with the previous cache implementation.
46+
class PartitionsCountCache
47+
include Helpers::Time
48+
49+
# Default time-to-live for cached partition counts in seconds
50+
#
51+
# @note This default was chosen to balance freshness of metadata with performance
52+
# optimization. Most Kafka cluster topology changes are planned operations, making 30
53+
# seconds a reasonable compromise.
54+
DEFAULT_TTL = 30
55+
56+
# Creates a new partition count cache
57+
#
58+
# @param ttl [Integer] Time-to-live in seconds for cached values
59+
def initialize(ttl = DEFAULT_TTL)
60+
@counts = {}
61+
@mutex_hash = {}
62+
# Used only for @mutex_hash access to ensure thread-safety when creating new mutexes
63+
@mutex_for_hash = Mutex.new
64+
@ttl = ttl
65+
end
66+
67+
# Reads partition count for a topic with automatic refresh when expired
68+
#
69+
# This method will return the cached partition count if available and not expired.
70+
# If the value is expired or not available, it will execute the provided block
71+
# to fetch the current value from Kafka.
72+
#
73+
# @param topic [String] Kafka topic name
74+
# @yield Block that returns the current partition count when cache needs refreshing
75+
# @yieldreturn [Integer] Current partition count retrieved from Kafka
76+
# @return [Integer] Partition count for the topic
77+
#
78+
# @note The implementation prioritizes read performance over write consistency
79+
# since partition counts typically only increase during normal operation.
80+
def get(topic)
81+
current_info = @counts[topic]
82+
83+
if current_info.nil? || expired?(current_info[0])
84+
new_count = yield
85+
86+
if current_info.nil?
87+
# No existing data, create a new entry with mutex
88+
set(topic, new_count)
89+
90+
return new_count
91+
else
92+
current_count = current_info[1]
93+
94+
if new_count > current_count
95+
# Higher value needs mutex to update both timestamp and count
96+
set(topic, new_count)
97+
98+
return new_count
99+
else
100+
# Same or lower value, just update timestamp without mutex
101+
refresh_timestamp(topic)
102+
103+
return current_count
104+
end
105+
end
106+
end
107+
108+
current_info[1]
109+
end
110+
111+
# Update partition count for a topic when needed
112+
#
113+
# This method updates the partition count for a topic in the cache.
114+
# It uses a mutex to ensure thread-safety during updates.
115+
#
116+
# @param topic [String] Kafka topic name
117+
# @param new_count [Integer] New partition count value
118+
#
119+
# @note We prioritize higher partition counts and only accept them when using
120+
# a mutex to ensure consistency. This design decision is based on the fact that
121+
# partition counts in Kafka only increase during normal operation.
122+
def set(topic, new_count)
123+
# First check outside mutex to avoid unnecessary locking
124+
current_info = @counts[topic]
125+
126+
# For lower values, we don't update count but might need to refresh timestamp
127+
if current_info && new_count < current_info[1]
128+
refresh_timestamp(topic)
129+
130+
return
131+
end
132+
133+
# Only lock the specific topic mutex
134+
mutex_for(topic).synchronize do
135+
# Check again inside the lock as another thread might have updated
136+
current_info = @counts[topic]
137+
138+
if current_info.nil?
139+
# Create new entry
140+
@counts[topic] = [monotonic_now, new_count]
141+
else
142+
current_count = current_info[1]
143+
144+
if new_count > current_count
145+
# Update to higher count value
146+
current_info[0] = monotonic_now
147+
current_info[1] = new_count
148+
else
149+
# Same or lower count, update timestamp only
150+
current_info[0] = monotonic_now
151+
end
152+
end
153+
end
154+
end
155+
156+
# @return [Hash] hash with ttls and partitions counts array
157+
def to_h
158+
@counts
159+
end
160+
161+
private
162+
163+
# Get or create a mutex for a specific topic
164+
#
165+
# This method ensures that each topic has its own mutex,
166+
# allowing operations on different topics to proceed in parallel.
167+
#
168+
# @param topic [String] Kafka topic name
169+
# @return [Mutex] Mutex for the specified topic
170+
#
171+
# @note We use a separate mutex (@mutex_for_hash) to protect the creation
172+
# of new topic mutexes. This pattern allows fine-grained locking while
173+
# maintaining thread-safety.
174+
def mutex_for(topic)
175+
mutex = @mutex_hash[topic]
176+
177+
return mutex if mutex
178+
179+
# Use a separate mutex to protect the creation of new topic mutexes
180+
@mutex_for_hash.synchronize do
181+
# Check again in case another thread created it
182+
@mutex_hash[topic] ||= Mutex.new
183+
end
184+
185+
@mutex_hash[topic]
186+
end
187+
188+
# Update the timestamp without acquiring the mutex
189+
#
190+
# This is an optimization that allows refreshing the TTL of existing entries
191+
# without the overhead of mutex acquisition.
192+
#
193+
# @param topic [String] Kafka topic name
194+
#
195+
# @note This method is safe for refreshing existing data regardless of count
196+
# because it only updates the timestamp, which doesn't affect the correctness
197+
# of concurrent operations.
198+
def refresh_timestamp(topic)
199+
current_info = @counts[topic]
200+
201+
return unless current_info
202+
203+
# Update the timestamp in-place
204+
current_info[0] = monotonic_now
205+
end
206+
207+
# Check if a timestamp has expired based on the TTL
208+
#
209+
# @param timestamp [Float] Monotonic timestamp to check
210+
# @return [Boolean] true if expired, false otherwise
211+
def expired?(timestamp)
212+
monotonic_now - timestamp > @ttl
213+
end
214+
end
215+
end
216+
end

0 commit comments

Comments
 (0)