Skip to content

Commit d18413f

Browse files
committed
feat(logs): support log streaming
Signed-off-by: Vjeran Grozdanic <[email protected]>
1 parent 2ea949b commit d18413f

File tree

4 files changed

+347
-3
lines changed

4 files changed

+347
-3
lines changed

src/api/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2504,7 +2504,7 @@ struct LogsResponse {
25042504
}
25052505

25062506
/// Log entry structure from the logs API
2507-
#[derive(Debug, Deserialize)]
2507+
#[derive(Debug, Deserialize, Clone)]
25082508
pub struct LogEntry {
25092509
#[serde(rename = "sentry.item_id")]
25102510
pub item_id: String,

src/commands/logs/list.rs

Lines changed: 269 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use anyhow::Result;
22
use clap::Args;
3+
use std::collections::HashSet;
4+
use std::time::Duration;
35

4-
use crate::api::{Api, Dataset, FetchEventsOptions};
6+
use crate::api::{Api, Dataset, FetchEventsOptions, LogEntry};
57
use crate::config::Config;
68
use crate::utils::formatting::Table;
79

@@ -26,6 +28,9 @@ const LOG_FIELDS: &[&str] = &[
2628
"message",
2729
];
2830

31+
/// Maximum number of log entries to keep in memory for deduplication
32+
const MAX_DEDUP_BUFFER_SIZE: usize = 10_000;
33+
2934
/// Arguments for listing logs
3035
#[derive(Args)]
3136
pub(super) struct ListLogsArgs {
@@ -45,6 +50,14 @@ pub(super) struct ListLogsArgs {
4550
#[arg(long = "query", default_value = "")]
4651
#[arg(help = "Query to filter logs. Example: \"level:error\"")]
4752
query: String,
53+
54+
#[arg(long = "live")]
55+
#[arg(help = "Enable live streaming mode to continuously poll for new logs.")]
56+
live: bool,
57+
58+
#[arg(long = "poll-interval", default_value = "2")]
59+
#[arg(help = "Polling interval in seconds for live streaming mode.")]
60+
poll_interval: u64,
4861
}
4962

5063
pub(super) fn execute(args: ListLogsArgs) -> Result<()> {
@@ -76,7 +89,11 @@ pub(super) fn execute(args: ListLogsArgs) -> Result<()> {
7689
Some(args.query.as_str())
7790
};
7891

79-
execute_single_fetch(&api, &org, &project, query, LOG_FIELDS, &args)
92+
if args.live {
93+
execute_live_streaming(&api, &org, &project, query, LOG_FIELDS, &args)
94+
} else {
95+
execute_single_fetch(&api, &org, &project, query, LOG_FIELDS, &args)
96+
}
8097
}
8198

8299
fn execute_single_fetch(
@@ -129,3 +146,253 @@ fn execute_single_fetch(
129146

130147
Ok(())
131148
}
149+
150+
/// Manages deduplication of log entries with a bounded buffer
151+
struct LogDeduplicator {
152+
/// Set of seen log IDs for quick lookup
153+
seen_ids: HashSet<String>,
154+
/// Buffer of log entries in order (for maintaining size limit)
155+
buffer: Vec<LogEntry>,
156+
/// Maximum size of the buffer
157+
max_size: usize,
158+
}
159+
160+
impl LogDeduplicator {
161+
fn new(max_size: usize) -> Self {
162+
Self {
163+
seen_ids: HashSet::new(),
164+
buffer: Vec::new(),
165+
max_size,
166+
}
167+
}
168+
169+
/// Add new logs and return only the ones that haven't been seen before
170+
fn add_logs(&mut self, new_logs: Vec<LogEntry>) -> Vec<LogEntry> {
171+
let mut unique_logs = Vec::new();
172+
173+
for log in new_logs {
174+
if !self.seen_ids.contains(&log.item_id) {
175+
self.seen_ids.insert(log.item_id.clone());
176+
self.buffer.push(log.clone());
177+
unique_logs.push(log);
178+
}
179+
}
180+
181+
// Maintain buffer size limit by removing oldest entries
182+
while self.buffer.len() > self.max_size {
183+
let removed_log = self.buffer.remove(0);
184+
self.seen_ids.remove(&removed_log.item_id);
185+
}
186+
187+
unique_logs
188+
}
189+
}
190+
191+
fn execute_live_streaming(
192+
api: &Api,
193+
org: &str,
194+
project: &str,
195+
query: Option<&str>,
196+
fields: &[&str],
197+
args: &ListLogsArgs,
198+
) -> Result<()> {
199+
let mut deduplicator = LogDeduplicator::new(MAX_DEDUP_BUFFER_SIZE);
200+
let poll_duration = Duration::from_secs(args.poll_interval);
201+
let mut consecutive_new_only_count = 0;
202+
const WARNING_THRESHOLD: usize = 3; // Show warning after 3 consecutive new-only responses
203+
204+
println!("Starting live log streaming...");
205+
println!(
206+
"Polling every {} seconds. Press Ctrl+C to stop.",
207+
args.poll_interval
208+
);
209+
210+
// Set up table with headers and print header once
211+
let mut table = Table::new();
212+
table
213+
.title_row()
214+
.add("Item ID")
215+
.add("Timestamp")
216+
.add("Severity")
217+
.add("Message")
218+
.add("Trace");
219+
220+
let mut header_printed = false;
221+
222+
loop {
223+
let options = FetchEventsOptions {
224+
dataset: Dataset::OurLogs,
225+
fields,
226+
project_id: Some(project),
227+
cursor: None,
228+
query,
229+
per_page: Some(args.max_rows),
230+
stats_period: Some("1h"),
231+
sort: Some("-timestamp"),
232+
};
233+
234+
match api
235+
.authenticated()?
236+
.fetch_organization_events(org, &options)
237+
{
238+
Ok(logs) => {
239+
let unique_logs = deduplicator.add_logs(logs);
240+
241+
if unique_logs.is_empty() {
242+
consecutive_new_only_count += 1;
243+
244+
if consecutive_new_only_count >= WARNING_THRESHOLD && args.query.is_empty() {
245+
eprintln!(
246+
"\n⚠️ Warning: No new logs found for {consecutive_new_only_count} consecutive polls."
247+
);
248+
249+
// Reset counter to avoid spam
250+
consecutive_new_only_count = 0;
251+
}
252+
} else {
253+
consecutive_new_only_count = 0;
254+
255+
// Add new logs to table
256+
for log in unique_logs {
257+
let row = table.add_row();
258+
row.add(&log.item_id)
259+
.add(&log.timestamp)
260+
.add(log.severity.as_deref().unwrap_or(""))
261+
.add(log.message.as_deref().unwrap_or(""))
262+
.add(log.trace.as_deref().unwrap_or(""));
263+
}
264+
265+
if !header_printed {
266+
// Print header with first data batch so column widths match actual data
267+
table.print_table_start();
268+
header_printed = true;
269+
} else {
270+
// Print only the rows (without table borders) for subsequent batches
271+
table.print_rows_only();
272+
}
273+
// Clear rows to free memory but keep the table structure for reuse
274+
table.clear_rows();
275+
}
276+
}
277+
Err(e) => {
278+
eprintln!("Error fetching logs: {e}");
279+
}
280+
}
281+
282+
std::thread::sleep(poll_duration);
283+
}
284+
}
285+
286+
#[cfg(test)]
287+
mod tests {
288+
use super::*;
289+
290+
fn create_test_log(id: &str, message: &str) -> LogEntry {
291+
LogEntry {
292+
item_id: id.to_owned(),
293+
trace: None,
294+
severity: Some("info".to_owned()),
295+
timestamp: "2025-01-01T00:00:00Z".to_owned(),
296+
message: Some(message.to_owned()),
297+
}
298+
}
299+
300+
#[test]
301+
fn test_log_deduplicator_new() {
302+
let deduplicator = LogDeduplicator::new(100);
303+
assert_eq!(deduplicator.seen_ids.len(), 0);
304+
}
305+
306+
#[test]
307+
fn test_log_deduplicator_add_unique_logs() {
308+
let mut deduplicator = LogDeduplicator::new(10);
309+
310+
let log1 = create_test_log("1", "test message 1");
311+
let log2 = create_test_log("2", "test message 2");
312+
313+
let unique_logs = deduplicator.add_logs(vec![log1.clone(), log2.clone()]);
314+
315+
assert_eq!(unique_logs.len(), 2);
316+
assert_eq!(deduplicator.seen_ids.len(), 2);
317+
}
318+
319+
#[test]
320+
fn test_log_deduplicator_deduplicate_logs() {
321+
let mut deduplicator = LogDeduplicator::new(10);
322+
323+
let log1 = create_test_log("1", "test message 1");
324+
let log2 = create_test_log("2", "test message 2");
325+
326+
// Add logs first time
327+
let unique_logs1 = deduplicator.add_logs(vec![log1.clone(), log2.clone()]);
328+
assert_eq!(unique_logs1.len(), 2);
329+
330+
// Add same logs again
331+
let unique_logs2 = deduplicator.add_logs(vec![log1.clone(), log2.clone()]);
332+
assert_eq!(unique_logs2.len(), 0); // Should be empty as logs already seen
333+
334+
assert_eq!(deduplicator.seen_ids.len(), 2);
335+
}
336+
337+
#[test]
338+
fn test_log_deduplicator_buffer_size_limit() {
339+
let mut deduplicator = LogDeduplicator::new(3);
340+
341+
// Add 5 logs to a buffer with max size 3
342+
let logs = vec![
343+
create_test_log("1", "test message 1"),
344+
create_test_log("2", "test message 2"),
345+
create_test_log("3", "test message 3"),
346+
create_test_log("4", "test message 4"),
347+
create_test_log("5", "test message 5"),
348+
];
349+
350+
let unique_logs = deduplicator.add_logs(logs);
351+
assert_eq!(unique_logs.len(), 5);
352+
353+
// After adding 5 logs to a buffer with max size 3, the oldest 2 should be evicted
354+
// So logs 1 and 2 should no longer be in the seen_ids set
355+
// Adding them again should return them as new logs
356+
let duplicate_logs = vec![
357+
create_test_log("1", "test message 1"),
358+
create_test_log("2", "test message 2"),
359+
];
360+
let duplicate_unique_logs = deduplicator.add_logs(duplicate_logs);
361+
assert_eq!(duplicate_unique_logs.len(), 2);
362+
363+
// Test that adding new logs still works
364+
let new_logs = vec![create_test_log("6", "test message 6")];
365+
let new_unique_logs = deduplicator.add_logs(new_logs);
366+
assert_eq!(new_unique_logs.len(), 1);
367+
}
368+
369+
#[test]
370+
fn test_log_deduplicator_mixed_new_and_old_logs() {
371+
let mut deduplicator = LogDeduplicator::new(10);
372+
373+
// Add initial logs
374+
let initial_logs = vec![
375+
create_test_log("1", "test message 1"),
376+
create_test_log("2", "test message 2"),
377+
];
378+
let unique_logs1 = deduplicator.add_logs(initial_logs);
379+
assert_eq!(unique_logs1.len(), 2);
380+
381+
// Add mix of new and old logs
382+
let mixed_logs = vec![
383+
create_test_log("1", "test message 1"), // old
384+
create_test_log("3", "test message 3"), // new
385+
create_test_log("2", "test message 2"), // old
386+
create_test_log("4", "test message 4"), // new
387+
];
388+
let unique_logs2 = deduplicator.add_logs(mixed_logs);
389+
390+
// Should only return the new logs (3 and 4)
391+
assert_eq!(unique_logs2.len(), 2);
392+
assert_eq!(unique_logs2[0].item_id, "3");
393+
assert_eq!(unique_logs2[1].item_id, "4");
394+
395+
assert_eq!(deduplicator.seen_ids.len(), 4);
396+
assert_eq!(deduplicator.buffer.len(), 4);
397+
}
398+
}

src/utils/formatting.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,75 @@ impl Table {
9898
}
9999
tbl.print_tty(false).ok();
100100
}
101+
102+
/// Print only the header row for streaming mode
103+
pub fn print_header(&self) {
104+
if let Some(ref title_row) = self.title_row {
105+
let mut tbl = prettytable::Table::new();
106+
tbl.set_format(*prettytable::format::consts::FORMAT_NO_BORDER);
107+
tbl.add_row(title_row.make_row());
108+
tbl.print_tty(false).ok();
109+
}
110+
}
111+
112+
/// Print header with first data batch for streaming mode
113+
/// This ensures header column widths match the actual data by letting prettytable
114+
/// size columns based on both header and data content together
115+
pub fn print_table_start(&self) {
116+
if self.is_empty() {
117+
self.print_header();
118+
return;
119+
}
120+
121+
let mut tbl = prettytable::Table::new();
122+
// Use the same base format as print_rows_only but add header separator
123+
let mut format = *prettytable::format::consts::FORMAT_NO_BORDER;
124+
format.column_separator('|');
125+
format.padding(1, 1);
126+
format.borders('|'); // Add left and right borders
127+
// Add separator line under the header
128+
format.separator(
129+
prettytable::format::LinePosition::Title,
130+
prettytable::format::LineSeparator::new('-', '+', '+', '+'),
131+
);
132+
tbl.set_format(format);
133+
134+
if let Some(ref title_row) = self.title_row {
135+
tbl.set_titles(title_row.make_row());
136+
}
137+
138+
for row in &self.rows {
139+
tbl.add_row(row.make_row());
140+
}
141+
142+
tbl.print_tty(false).ok();
143+
}
144+
145+
/// Print only the current rows with column separators but no borders for streaming mode
146+
pub fn print_rows_only(&self) {
147+
if self.is_empty() {
148+
return;
149+
}
150+
151+
let mut tbl = prettytable::Table::new();
152+
// Use format with column separators (|) and side borders, but no top/bottom borders
153+
let mut format = *prettytable::format::consts::FORMAT_NO_BORDER;
154+
format.column_separator('|');
155+
format.padding(1, 1);
156+
format.borders('|'); // Add left and right borders
157+
tbl.set_format(format);
158+
159+
for row in &self.rows {
160+
tbl.add_row(row.make_row());
161+
}
162+
163+
tbl.print_tty(false).ok();
164+
}
165+
166+
/// Clear all rows but keep the header for reuse in streaming mode
167+
pub fn clear_rows(&mut self) {
168+
self.rows.clear();
169+
}
101170
}
102171

103172
impl Default for Table {

0 commit comments

Comments
 (0)