Skip to content

Commit 7e40284

Browse files
committed
Fetch partition offset for group.
1 parent f821ca0 commit 7e40284

File tree

4 files changed

+120
-33
lines changed

4 files changed

+120
-33
lines changed

README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Lets start with an overview of features that exist in kafka-replicator:
1414
* [x] **Flexible topic selection:** Select topics with configurable config;
1515
* [ ] **Auto-create topics:** Destination topics are automatically created for strict_p2p strategy;
1616
* [x] **Stats:** The tool shows replication status;
17-
* [ ] **Monitoring:** Kafka replicator exports stats via prometheus.
17+
* [x] **Monitoring:** Kafka replicator exports stats via prometheus.
1818
* [ ] **Cycle detection**
1919

2020

@@ -133,11 +133,13 @@ routes:
133133
observers:
134134
- client: cl_1_client_1
135135
name: "my name"
136-
topics:
136+
group_id: group_name # used for remaining metrics
137+
topics: # filter by topics
137138
- 'topic1'
138139
- 'topic2'
139-
fetch_timeout_secs: 5
140-
show_progress_interval_secs: 10
140+
fetch_timeout_secs: 5 # default: 5
141+
fetch_interval_secs: 5 # default: 60
142+
show_progress_interval_secs: 10 # default: 60
141143

142144
- client: cl_2_client_1
143145
topic: 'topic3'
@@ -148,7 +150,7 @@ observers:
148150

149151
- client: cl_1_client_1
150152
topic: 'topic1'
151-
topics: []
153+
topics: [] # fetch all topics
152154
```
153155
154156

examples/example-config.yml

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,28 +56,26 @@ routes:
5656

5757
observers:
5858
- client: cl_1_client_1
59-
name: "my name"
60-
group_id: "group_id"
59+
name: "Observer name"
60+
group_id: group_22
6161
topics:
6262
- 'topic1'
6363
# - 'topic2'
64-
fetch_timeout_secs: 5
65-
fetch_interval_secs: 5
66-
show_progress_interval_secs: 1
67-
update_metrics_interval_secs: 10
64+
fetch_timeout_secs: 5 # default: 5
65+
fetch_interval_secs: 60 # default:
66+
show_progress_interval_secs: 10 # default: 60
6867

6968
- client: cl_2_client_1
70-
topic: 'topic3'
69+
# group_id: group_2
7170
topics:
7271
- 'topic2'
7372
show_progress_interval_secsy: 20
7473

7574
- client: cl_1_client_1
76-
topic: 'topic1'
77-
topics: []
75+
topics: [] # process all topics
7876

7977
prometheus:
80-
# namespace: "app:observer:"
78+
# namespace: "app:observer:" # custom prometheus metrics prefix
8179
labels:
8280
label_key: label_value
8381
env: prod

src/config.rs

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use rdkafka::{
2626
producer::{FutureProducer, FutureRecord},
2727
statistics::Statistics,
2828
util::Timeout,
29-
TopicPartitionList,
29+
TopicPartitionList, topic_partition_list::TopicPartitionListElem, groups::GroupList, metadata::Metadata,
3030
};
3131

3232
use futures::{future, stream::StreamExt};
@@ -342,6 +342,7 @@ pub struct PrometheusConfig {
342342
#[derive(Debug, Serialize, Deserialize, Clone)]
343343
pub struct ObserverConfig {
344344
client: String,
345+
group_id: Option<String>,
345346
topics: Vec<String>,
346347
show_progress_interval_secs: Option<u64>,
347348
fetch_timeout_secs: Option<u64>,
@@ -362,7 +363,7 @@ pub struct Observer {
362363

363364
last_status_time: Instant,
364365
last_update_time: Instant,
365-
last_results: HashMap<String, i64>,
366+
last_results: HashMap<String, (i64, i64)>,
366367

367368
metrics: Arc<Mutex<metrics::ObserverMetrics>>,
368369
}
@@ -449,8 +450,7 @@ impl Observer {
449450
let difference = Instant::now().duration_since(self.last_update_time);
450451

451452
if self.get_fetch_interval().as_secs() > 0 && difference > self.get_fetch_interval() {
452-
let results = self.update_current_status();
453-
self.last_results = results;
453+
self.last_results = self.update_current_status();
454454
self.last_update_time = Instant::now();
455455
}
456456
}
@@ -461,8 +461,13 @@ impl Observer {
461461

462462
if self.get_show_progress_interval().as_secs() > 0 && difference > self.get_show_progress_interval() {
463463

464-
for (name, count) in self.last_results.iter() {
465-
info!("\"{:}\": topic \"{:}\" has {:} message(s)", self.name, name, count);
464+
for (name, (count, remaining)) in self.last_results.iter() {
465+
if let Some(group_id) = self.group_id.clone(){
466+
info!("\"{:}\": topic=\"{:}\" group=\"{:}\" messages={:} remaining={:}", self.name, name, group_id, count, remaining);
467+
468+
}else {
469+
info!("\"{:}\": topic \"{:}\" messages={:} remaining={:}", self.name, name, count, remaining);
470+
}
466471
}
467472
self.last_status_time = Instant::now();
468473
}
@@ -489,9 +494,9 @@ impl Observer {
489494
)
490495
}
491496

492-
pub fn update_current_status(&self) -> HashMap<String, i64> {
497+
pub fn update_current_status(&self) -> HashMap<String, (i64, i64)> {
493498

494-
let mut results: HashMap<String, i64> = HashMap::new();
499+
let mut results: HashMap<String, (i64, i64)> = HashMap::new();
495500

496501
let topic: Option<&str> =
497502
if self.topics.len() == 1 {
@@ -500,15 +505,48 @@ impl Observer {
500505
None
501506
};
502507

503-
let metadata = self
508+
let metadata: Metadata = self
504509
.client
505510
.fetch_metadata(topic, self.get_fetch_timeout())
506511
.expect("Failed to fetch metadata");
507512

513+
// let groups: GroupList = self.client.fetch_group_list(None, Duration::from_secs(20)).unwrap();
514+
515+
// for item in groups.groups() {
516+
// debug!("Group name: {:}", item.name());
517+
518+
// }
519+
508520
for topic in metadata.topics().iter() {
509521
if self.topics_set.contains(topic.name()) || self.topics_set.len() == 0 {
522+
523+
524+
let tp_map: HashMap<(String, i32), rdkafka::Offset> = topic.partitions()
525+
.iter()
526+
.map(|partition_md|{
527+
((topic.name().to_string(), partition_md.id()), rdkafka::Offset::Stored)
528+
}).collect();
529+
530+
let tpl = TopicPartitionList::from_topic_map(&tp_map);
531+
let commited_offsets = self.client.committed_offsets(tpl, self.get_fetch_timeout()).unwrap_or(TopicPartitionList::new());
532+
533+
510534
let mut topic_message_count = 0;
535+
let mut topic_remaining_count = 0;
511536
for partition in topic.partitions() {
537+
538+
let tpl_item: Option<TopicPartitionListElem> = commited_offsets.find_partition(topic.name(), partition.id());
539+
540+
let mut current_partition_offset = 0;
541+
if let Some(value) = tpl_item {
542+
match value.offset(){
543+
rdkafka::Offset::Offset(offset) => {
544+
current_partition_offset = offset;
545+
},
546+
_ => {}
547+
}
548+
}
549+
512550
let (low, high) = self
513551
.client
514552
.fetch_watermarks(
@@ -520,6 +558,9 @@ impl Observer {
520558
let partition_message_count = high - low;
521559
topic_message_count += partition_message_count;
522560

561+
let partition_remaining_count = high - current_partition_offset;
562+
topic_remaining_count += partition_remaining_count;
563+
523564
let labels = [topic.name(), &partition.id().to_string()];
524565

525566
match self.metrics.lock() {
@@ -530,6 +571,13 @@ impl Observer {
530571
guard.partition_end_offset.with_label_values(&labels).set(high);
531572

532573
guard.number_of_records_for_partition.with_label_values(&labels).set(partition_message_count);
574+
575+
if let Some(group_id) = self.group_id.clone() {
576+
let labels = [topic.name(), &partition.id().to_string(), &group_id];
577+
guard.commited_offset.with_label_values(&labels).set(current_partition_offset);
578+
guard.remaining_by_partition.with_label_values(&labels).set(partition_remaining_count);
579+
580+
}
533581
},
534582
Err(_poisoned) => {
535583
error!("Can't acquire metrics lock for topic={:} and partition={:}", topic.name(), &partition.id());
@@ -540,7 +588,7 @@ impl Observer {
540588

541589
}
542590

543-
results.insert(topic.name().to_string(), topic_message_count);
591+
results.insert(topic.name().to_string(), (topic_message_count, topic_remaining_count));
544592

545593
match self.metrics.lock() {
546594
Ok(guard) => {
@@ -550,6 +598,10 @@ impl Observer {
550598
guard.number_of_records_total.with_label_values(&[topic.name()]).set(topic_message_count);
551599
guard.last_fetch_ts.with_label_values(&[topic.name()]).set(since_the_epoch.as_secs_f64());
552600

601+
if let Some(group_id) = self.group_id.clone() {
602+
guard.remaining_for_topic.with_label_values(&[topic.name(), &group_id]).set(topic_remaining_count);
603+
}
604+
553605
},
554606
Err(_) => {
555607
error!("Can't acquire metrics lock for topic={:}", topic.name())
@@ -720,7 +772,7 @@ impl Pipeline {
720772
}
721773

722774
},
723-
Err(poisoned) => {
775+
Err(_poisoned) => {
724776
error!("Can't acquire metrics for {:} {:} [ {:} ] -> {:} [ {:} ]",
725777
self.name,
726778
self.upstream_client_name,
@@ -1005,11 +1057,12 @@ impl Config {
10051057
.iter()
10061058
.enumerate()
10071059
.map(|(i, x)| {
1008-
let client: ClientConfig = self.create_client_config(&x.client, None);
1060+
1061+
let client: ClientConfig = self.create_client_config(&x.client, (x.group_id).as_deref());
10091062

10101063
Observer {
10111064
client: client.create().expect("Can't create consumer"),
1012-
group_id: None,
1065+
group_id: x.group_id.clone(),
10131066
topics: x.topics.clone(),
10141067
topics_set: x.topics.iter().cloned().collect(),
10151068
show_progress_interval_secs: x.show_progress_interval_secs.clone(),

src/metrics.rs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ const DEFAULT_REPLICATOR_NAMESPACE: &str = "service:kafka_replicator:";
1212

1313

1414
pub trait Metrics: Sync + Send + 'static {
15-
// fn get_metrics(&self) -> String {
16-
// String::from("vfvasfadsfsa")
17-
// }
15+
1816
fn get_registry(&self) -> &Registry;
1917
fn get_metrics(&self) -> String {
2018
let mut buffer = vec![];
@@ -177,7 +175,10 @@ pub struct ObserverMetrics {
177175
pub partition_start_offset: IntGaugeVec,
178176
pub partition_end_offset: IntGaugeVec,
179177
pub number_of_records_for_partition: IntGaugeVec,
180-
pub last_fetch_ts: GaugeVec
178+
pub last_fetch_ts: GaugeVec,
179+
pub commited_offset: IntGaugeVec,
180+
pub remaining_by_partition: IntGaugeVec,
181+
pub remaining_for_topic: IntGaugeVec
181182
}
182183

183184
impl ObserverMetrics {
@@ -234,6 +235,36 @@ impl ObserverMetrics {
234235
let last_fetch_ts: GaugeVec = GaugeVec::new(opts, &["topic"]).unwrap();
235236
registry.register(Box::new(last_fetch_ts.clone())).expect("Can't register metric");
236237

238+
let label_names = ["topic", "partition", "group"];
239+
240+
241+
let opts= Opts::new(
242+
vec!(namespace.clone(), "commited_offset".to_string()).join(""),
243+
"commited offset for partition".to_string()).const_labels(labels.clone());
244+
245+
let commited_offset: IntGaugeVec = IntGaugeVec::new(opts, &label_names).unwrap();
246+
247+
let opts= Opts::new(
248+
vec!(namespace.clone(), "remaining_by_partition".to_string()).join(""),
249+
"commited offset for partition".to_string()).const_labels(labels.clone());
250+
251+
let remaining_by_partition: IntGaugeVec = IntGaugeVec::new(opts, &label_names).unwrap();
252+
253+
let opts= Opts::new(
254+
vec!(namespace.clone(), "remaining_for_topic".to_string()).join(""),
255+
"remaining records for topics".to_string()).const_labels(labels.clone());
256+
257+
let remaining_for_topic: IntGaugeVec = IntGaugeVec::new(opts, &["topic", "group"]).unwrap();
258+
259+
let metrics = [
260+
&commited_offset,
261+
&remaining_by_partition,
262+
&remaining_for_topic];
263+
264+
for item in metrics.iter() {
265+
registry.register(Box::new((*item).clone())).expect("Can't register metric");
266+
}
267+
237268

238269
Self { registry,
239270
namespace,
@@ -242,7 +273,10 @@ impl ObserverMetrics {
242273
partition_start_offset,
243274
partition_end_offset,
244275
number_of_records_for_partition,
245-
last_fetch_ts
276+
last_fetch_ts,
277+
commited_offset,
278+
remaining_for_topic,
279+
remaining_by_partition
246280
}
247281

248282
}

0 commit comments

Comments
 (0)