From 661c3a4948709fb4b08bb7de86358c5ae6d89485 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 8 Jun 2025 22:49:40 -0400 Subject: [PATCH 01/20] feat: add stats for each field read the record batches from arrow files in staging directory run datafusion queries to fetch count, distinct count and count for each distinct values for all fields in the dataset store in _pmeta dataset UI to call below SQL query to fetch the stats from this dataset- ``` SELECT field_name, field_count distinct_count, distinct_value, distinct_value_count FROM ( SELECT field_stats_field_name as field_name, field_stats_distinct_stats_distinct_value as distinct_value, SUM(field_stats_count) as field_count, field_stats_distinct_count as distinct_count, SUM(field_stats_distinct_stats_count) as distinct_value_count, ROW_NUMBER() OVER ( PARTITION BY field_stats_field_name ORDER BY SUM(field_stats_count) DESC ) as rn FROM _pmeta WHERE field_stats_field_name = 'status_code' AND field_stats_distinct_stats_distinct_value IS NOT NULL GROUP BY field_stats_field_name, field_stats_distinct_stats_distinct_value, field_stats_distinct_count ) ranked WHERE rn <= 5 ORDER BY field_name, distinct_value_count DESC; ``` --- src/parseable/streams.rs | 248 +++++++++++++++++++++++++++++++++++---- 1 file changed, 223 insertions(+), 25 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 2ed089145..66e5d8985 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -26,10 +26,13 @@ use std::{ time::{Instant, SystemTime, UNIX_EPOCH}, }; -use arrow_array::RecordBatch; +use arrow_array::{Array, Float64Array, Int64Array, NullArray, StringArray}; +use arrow_array::{BooleanArray, RecordBatch, TimestampMillisecondArray}; use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; +use datafusion::{datasource::MemTable, prelude::SessionContext}; use derive_more::{Deref, DerefMut}; +use futures::stream::{FuturesUnordered, StreamExt}; use itertools::Itertools; use parquet::{ arrow::ArrowWriter, @@ -39,6 +42,7 @@ use parquet::{ schema::types::ColumnPath, }; use relative_path::RelativePathBuf; +use serde::Serialize; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; @@ -48,10 +52,15 @@ use crate::{ format::{LogSource, LogSourceEntry}, DEFAULT_TIMESTAMP_KEY, }, + handlers::http::{ + cluster::INTERNAL_STREAM_NAME, ingest::PostError, + modal::utils::ingest_utils::flatten_and_push_logs, + }, handlers::http::modal::{ingest_server::INGESTOR_META, query_server::QUERIER_META}, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, + parseable::PARSEABLE, storage::{object_storage::to_bytes, retention::Retention, StreamType}, utils::time::{Minute, TimeRange}, LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, @@ -66,6 +75,26 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +#[derive(Serialize, Debug)] +struct DistinctStat { + distinct_value: String, + count: i64, +} + +#[derive(Serialize, Debug)] +struct FieldStat { + field_name: String, + count: i64, + distinct_count: i64, + distinct_stats: Vec, +} + +#[derive(Serialize, Debug)] +struct DatasetStats { + dataset_name: String, + field_stats: Vec, +} + const INPROCESS_DIR_PREFIX: &str = "processing_"; /// Returns the filename for parquet if provided arrows file path is valid as per our expectation @@ -463,7 +492,7 @@ impl Stream { } /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` - pub fn prepare_parquet( + pub async fn prepare_parquet( &self, init_signal: bool, shutdown_signal: bool, @@ -478,19 +507,24 @@ impl Stream { // read arrow files on disk // convert them to parquet - let schema = self - .convert_disk_files_to_parquet( - time_partition.as_ref(), - custom_partition.as_ref(), + let (schema, rbs) = self.convert_disk_files_to_parquet( + time_partition.as_ref(), + custom_partition.as_ref(), init_signal, - shutdown_signal, - ) - .inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?; - + shutdown_signal, + )?; // check if there is already a schema file in staging pertaining to this stream // if yes, then merge them and save if let Some(mut schema) = schema { + if !&self.stream_name.contains(INTERNAL_STREAM_NAME) { + if let Err(err) = self.calculate_field_stats(rbs, schema.clone().into()).await { + warn!( + "Error calculating field stats for stream {}: {}", + self.stream_name, err + ); + } + } let static_schema_flag = self.get_static_schema_flag(); if !static_schema_flag { // schema is dynamic, read from staging and merge if present @@ -627,7 +661,7 @@ impl Stream { custom_partition: Option<&String>, init_signal: bool, shutdown_signal: bool, - ) -> Result, StagingError> { + ) -> Result<(Option, Vec), StagingError> { let mut schemas = Vec::new(); let now = SystemTime::now(); @@ -636,11 +670,12 @@ impl Stream { self.arrow_files_grouped_exclude_time(now, group_minute, init_signal, shutdown_signal); if staging_files.is_empty() { self.reset_staging_metrics(); - return Ok(None); + return Ok((None, Vec::new())); } self.update_staging_metrics(&staging_files); + let mut record_batches = Vec::new(); for (parquet_path, arrow_files) in staging_files { let record_reader = MergedReverseRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { @@ -658,6 +693,7 @@ impl Stream { &schema, &props, time_partition, + &mut record_batches, )? { continue; } @@ -670,10 +706,10 @@ impl Stream { } if schemas.is_empty() { - return Ok(None); + return Ok((None, Vec::new())); } - Ok(Some(Schema::try_merge(schemas).unwrap())) + Ok((Some(Schema::try_merge(schemas).unwrap()), record_batches)) } fn write_parquet_part_file( @@ -683,6 +719,7 @@ impl Stream { schema: &Arc, props: &WriterProperties, time_partition: Option<&String>, + record_batches: &mut Vec, ) -> Result { let mut part_file = OpenOptions::new() .create(true) @@ -692,6 +729,7 @@ impl Stream { let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?; for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { writer.write(record)?; + record_batches.push(record.clone()); } writer.close()?; @@ -958,7 +996,7 @@ impl Stream { } /// First flushes arrows onto disk and then converts the arrow into parquet - pub fn flush_and_convert( + pub async fn flush_and_convert( &self, init_signal: bool, shutdown_signal: bool, @@ -972,7 +1010,8 @@ impl Stream { ); let start_convert = Instant::now(); - self.prepare_parquet(init_signal, shutdown_signal)?; + + self.prepare_parquet(init_signal, shutdown_signal).await?; trace!( "Converting arrows to parquet on stream ({}) took: {}s", self.stream_name, @@ -981,6 +1020,165 @@ impl Stream { Ok(()) } + + async fn calculate_field_stats( + &self, + record_batches: Vec, + schema: Arc, + ) -> Result<(), PostError> { + let dataset_meta = format!("{}_{INTERNAL_STREAM_NAME}", &self.stream_name); + let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); + PARSEABLE + .create_stream_if_not_exists( + &dataset_meta, + StreamType::Internal, + vec![log_source_entry], + ) + .await?; + let mem_table = MemTable::try_new(schema.clone(), vec![record_batches]) + .map_err(|e| PostError::Invalid(e.into()))?; + let ctx = SessionContext::new(); + ctx.register_table(&self.stream_name, Arc::new(mem_table)) + .map_err(|e| PostError::Invalid(e.into()))?; + + let field_stats = self.collect_all_field_stats(&ctx, &schema).await; + + let stats = DatasetStats { + dataset_name: self.stream_name.clone(), + field_stats, + }; + if stats.field_stats.is_empty() { + return Ok(()); + } + let stats_value = serde_json::to_value(&stats).map_err(|e| PostError::Invalid(e.into()))?; + + flatten_and_push_logs( + stats_value, + &dataset_meta, + &LogSource::Json, + &HashMap::new(), + ) + .await?; + Ok(()) + } + + async fn collect_all_field_stats( + &self, + ctx: &SessionContext, + schema: &Arc, + ) -> Vec { + let field_futures = schema.fields().iter().map(|field| { + let ctx = ctx.clone(); + let stream_name = self.stream_name.clone(); + let field_name = field.name().clone(); + async move { Self::calculate_single_field_stats(ctx, stream_name, field_name).await } + }); + + FuturesUnordered::from_iter(field_futures) + .filter_map(|x| async { x }) + .collect::>() + .await + } + + async fn calculate_single_field_stats( + ctx: SessionContext, + stream_name: String, + field_name: String, + ) -> Option { + let count = Self::query_single_i64( + &ctx, + &format!( + "select count(\"{field_name}\") as count from \"{stream_name}\" where \"{field_name}\" is not null" + ), + ) + .await?; + if count == 0 { + return None; + } + + let distinct_count = Self::query_single_i64( + &ctx, + &format!( + "select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\"" + ), + ) + .await?; + + let distinct_stats = Self::query_distinct_stats(&ctx, &stream_name, &field_name).await; + + Some(FieldStat { + field_name, + count, + distinct_count, + distinct_stats, + }) + } + + async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { + let df = ctx.sql(sql).await.ok()?; + let batches = df.collect().await.ok()?; + let array = batches + .first()? + .column(0) + .as_any() + .downcast_ref::()?; + Some(array.value(0)) + } + + fn format_arrow_value(array: &dyn Array, idx: usize) -> String { + if array.is_null(idx) { + return "NULL".to_string(); + } + if let Some(arr) = array.as_any().downcast_ref::() { + arr.value(idx).to_string() + } else if let Some(arr) = array.as_any().downcast_ref::() { + arr.value(idx).to_string() + } else if let Some(arr) = array.as_any().downcast_ref::() { + arr.value(idx).to_string() + } else if let Some(arr) = array.as_any().downcast_ref::() { + let timestamp = arr.value(idx); + DateTime::from_timestamp_millis(timestamp) + .map(|dt| dt.to_string()) + .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) + } else if let Some(arr) = array.as_any().downcast_ref::() { + arr.value(idx).to_string() + } else if array.as_any().downcast_ref::().is_some() { + "NULL".to_string() + } else { + "UNSUPPORTED".to_string() + } + } + + async fn query_distinct_stats( + ctx: &SessionContext, + stream_name: &str, + field_name: &str, + ) -> Vec { + let sql = format!( + "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit 50" + ); + let mut distinct_stats = Vec::new(); + if let Ok(df) = ctx.sql(&sql).await { + if let Ok(batches) = df.collect().await { + for rb in batches { + let counts = rb + .column(0) + .as_any() + .downcast_ref::() + .expect("Counts should be Int64Array"); + let values = rb.column(1).as_ref(); + for i in 0..rb.num_rows() { + let value = Self::format_arrow_value(values, i); + distinct_stats.push(DistinctStat { + distinct_value: value, + count: counts.value(i), + }); + } + } + } + } + distinct_stats + } } #[derive(Deref, DerefMut, Default)] @@ -1067,7 +1265,7 @@ impl Streams { .map(Arc::clone) .collect(); for stream in streams { - joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal) }); + joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal).await }); } } } @@ -1257,7 +1455,7 @@ mod tests { None, ) .convert_disk_files_to_parquet(None, None, false, false)?; - assert!(result.is_none()); + assert!(result.0.is_none()); // Verify metrics were set to 0 let staging_files = metrics::STAGING_FILES.with_label_values(&[&stream]).get(); assert_eq!(staging_files, 0); @@ -1338,8 +1536,8 @@ mod tests { .convert_disk_files_to_parquet(None, None, false, true) .unwrap(); - assert!(result.is_some()); - let result_schema = result.unwrap(); + assert!(result.0.is_some()); + let result_schema = result.0.unwrap(); assert_eq!(result_schema.fields().len(), 3); // Verify parquet files were created and the arrow files deleted @@ -1387,8 +1585,8 @@ mod tests { .convert_disk_files_to_parquet(None, None, false, true) .unwrap(); - assert!(result.is_some()); - let result_schema = result.unwrap(); + assert!(result.0.is_some()); + let result_schema = result.0.unwrap(); assert_eq!(result_schema.fields().len(), 3); // Verify parquet files were created and the arrow files deleted @@ -1441,8 +1639,8 @@ mod tests { .convert_disk_files_to_parquet(None, None, false, false) .unwrap(); - assert!(result.is_some()); - let result_schema = result.unwrap(); + assert!(result.0.is_some()); + let result_schema = result.0.unwrap(); assert_eq!(result_schema.fields().len(), 3); // Verify parquet files were created and the arrow file left From e548a1192468eeadb4a82abbc03e322aca51a09e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 9 Jun 2025 02:42:23 -0400 Subject: [PATCH 02/20] send stats to one dataset with name `dataset_pmeta` --- src/parseable/streams.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 66e5d8985..a35b5cac5 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -1026,11 +1026,11 @@ impl Stream { record_batches: Vec, schema: Arc, ) -> Result<(), PostError> { - let dataset_meta = format!("{}_{INTERNAL_STREAM_NAME}", &self.stream_name); + let stats_dataset_name = format!("dataset_{INTERNAL_STREAM_NAME}"); let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); PARSEABLE .create_stream_if_not_exists( - &dataset_meta, + &stats_dataset_name, StreamType::Internal, vec![log_source_entry], ) @@ -1054,7 +1054,7 @@ impl Stream { flatten_and_push_logs( stats_value, - &dataset_meta, + &stats_dataset_name, &LogSource::Json, &HashMap::new(), ) From c7e0e515f0dadc6d19617269ad0445515e794b34 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 9 Jun 2025 08:35:04 -0400 Subject: [PATCH 03/20] refactor --- src/cli.rs | 9 +++++++++ src/parseable/streams.rs | 42 +++++++++++++++++++++++++++------------- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 26cab2e95..68df00dc2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -388,6 +388,15 @@ pub struct Options { help = "Maximum level of flattening allowed for events" )] pub event_flatten_level: usize, + + // maximum limit to store the statistics for a field + #[arg( + long, + env = "P_MAX_FIELD_STATISTICS", + default_value = "50", + help = "Maximum number of field statistics to store" + )] + pub max_field_statistics: usize, } #[derive(Parser, Debug)] diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index a35b5cac5..3a9bcd43e 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -26,13 +26,13 @@ use std::{ time::{Instant, SystemTime, UNIX_EPOCH}, }; -use arrow_array::{Array, Float64Array, Int64Array, NullArray, StringArray}; +use arrow_array::{Array, Date32Array, Float64Array, Int64Array, NullArray, StringArray}; use arrow_array::{BooleanArray, RecordBatch, TimestampMillisecondArray}; use arrow_schema::{Field, Fields, Schema}; use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use datafusion::{datasource::MemTable, prelude::SessionContext}; use derive_more::{Deref, DerefMut}; -use futures::stream::{FuturesUnordered, StreamExt}; +use futures_util::StreamExt; use itertools::Itertools; use parquet::{ arrow::ArrowWriter, @@ -75,6 +75,8 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +const MAX_CONCURRENT_FIELD_STATS: usize = 10; + #[derive(Serialize, Debug)] struct DistinctStat { distinct_value: String, @@ -517,7 +519,7 @@ impl Stream { // if yes, then merge them and save if let Some(mut schema) = schema { - if !&self.stream_name.contains(INTERNAL_STREAM_NAME) { + if self.get_stream_type() != StreamType::Internal { if let Err(err) = self.calculate_field_stats(rbs, schema.clone().into()).await { warn!( "Error calculating field stats for stream {}: {}", @@ -1067,19 +1069,25 @@ impl Stream { ctx: &SessionContext, schema: &Arc, ) -> Vec { - let field_futures = schema.fields().iter().map(|field| { + // Collect field names into an owned Vec to avoid lifetime issues + let field_names: Vec = schema + .fields() + .iter() + .map(|field| field.name().clone()) + .collect(); + + let field_futures = field_names.into_iter().map(|field_name| { let ctx = ctx.clone(); let stream_name = self.stream_name.clone(); - let field_name = field.name().clone(); async move { Self::calculate_single_field_stats(ctx, stream_name, field_name).await } }); - FuturesUnordered::from_iter(field_futures) + futures::stream::iter(field_futures) + .buffer_unordered(MAX_CONCURRENT_FIELD_STATS) .filter_map(|x| async { x }) .collect::>() .await } - async fn calculate_single_field_stats( ctx: SessionContext, stream_name: String, @@ -1117,11 +1125,12 @@ impl Stream { async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { let df = ctx.sql(sql).await.ok()?; let batches = df.collect().await.ok()?; - let array = batches - .first()? - .column(0) - .as_any() - .downcast_ref::()?; + let batch = batches.first()?; + if batch.num_rows() == 0 { + return None; + } + let array = batch.column(0).as_any().downcast_ref::()?; + Some(array.value(0)) } @@ -1140,11 +1149,17 @@ impl Stream { DateTime::from_timestamp_millis(timestamp) .map(|dt| dt.to_string()) .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) + } else if let Some(arr) = array.as_any().downcast_ref::() { + return arr.value(idx).to_string(); } else if let Some(arr) = array.as_any().downcast_ref::() { arr.value(idx).to_string() } else if array.as_any().downcast_ref::().is_some() { "NULL".to_string() } else { + warn!( + "Unsupported array type for statistics: {:?}", + array.data_type() + ); "UNSUPPORTED".to_string() } } @@ -1155,7 +1170,8 @@ impl Stream { field_name: &str, ) -> Vec { let sql = format!( - "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit 50" + "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit {}", + PARSEABLE.options.max_field_statistics ); let mut distinct_stats = Vec::new(); if let Ok(df) = ctx.sql(&sql).await { From 4002cfa38b645ad2eb6c0a2b41dc3b823df65f7d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 9 Jun 2025 10:43:55 -0400 Subject: [PATCH 04/20] deepsource analysis fix --- src/parseable/streams.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 3a9bcd43e..b0f95dba2 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -519,6 +519,7 @@ impl Stream { // if yes, then merge them and save if let Some(mut schema) = schema { + // calculate field stats for all user defined streams if self.get_stream_type() != StreamType::Internal { if let Err(err) = self.calculate_field_stats(rbs, schema.clone().into()).await { warn!( @@ -731,6 +732,7 @@ impl Stream { let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?; for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { writer.write(record)?; + // Collect record batches for finding statistics later record_batches.push(record.clone()); } writer.close()?; @@ -1023,6 +1025,9 @@ impl Stream { Ok(()) } + /// Calculates field statistics for the stream and pushes them to the internal stats dataset. + /// This function creates a new internal stream for stats if it doesn't exist. + /// It collects statistics for each field in the stream async fn calculate_field_stats( &self, record_batches: Vec, @@ -1064,6 +1069,9 @@ impl Stream { Ok(()) } + /// Collects statistics for all fields in the stream. + /// Returns a vector of `FieldStat` for each field with non-zero count. + /// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently. async fn collect_all_field_stats( &self, ctx: &SessionContext, @@ -1084,10 +1092,13 @@ impl Stream { futures::stream::iter(field_futures) .buffer_unordered(MAX_CONCURRENT_FIELD_STATS) - .filter_map(|x| async { x }) + .filter_map(std::future::ready) .collect::>() .await } + + /// Calculates statistics for a single field in the stream. + /// Returns `None` if the count query returns 0. async fn calculate_single_field_stats( ctx: SessionContext, stream_name: String, @@ -1122,6 +1133,9 @@ impl Stream { }) } + /// Queries a single integer value from the DataFusion context. + /// Returns `None` if the query fails or returns no rows. + /// This is used for fetching record count for a field and distinct count. async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { let df = ctx.sql(sql).await.ok()?; let batches = df.collect().await.ok()?; @@ -1134,6 +1148,8 @@ impl Stream { Some(array.value(0)) } + /// Helper function to format an Arrow value at a given index into a string. + /// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean. fn format_arrow_value(array: &dyn Array, idx: usize) -> String { if array.is_null(idx) { return "NULL".to_string(); @@ -1164,6 +1180,9 @@ impl Stream { } } + /// This function is used to fetch distinct values and their counts for a field in the stream. + /// Returns a vector of `DistinctStat` containing distinct values and their counts. + /// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`. async fn query_distinct_stats( ctx: &SessionContext, stream_name: &str, From 282534546e42b55e889bb9cf818a4f7a07188086 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 02:48:28 -0400 Subject: [PATCH 05/20] include null, add query param in prism home api --- src/handlers/http/prism_home.rs | 15 +++++++++++---- src/parseable/streams.rs | 23 ++++++++++------------- src/prism/home/mod.rs | 15 +++++++++++++-- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/handlers/http/prism_home.rs b/src/handlers/http/prism_home.rs index 10ff0f04c..7754d3afc 100644 --- a/src/handlers/http/prism_home.rs +++ b/src/handlers/http/prism_home.rs @@ -26,7 +26,7 @@ use crate::{ }; const HOME_SEARCH_QUERY_PARAM: &str = "key"; - +const HOME_QUERY_PARAM: &str = "includeInternal"; /// Fetches the data to populate Prism's home /// /// @@ -36,8 +36,14 @@ const HOME_SEARCH_QUERY_PARAM: &str = "key"; pub async fn home_api(req: HttpRequest) -> Result { let key = extract_session_key_from_req(&req) .map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?; + let query_map = web::Query::>::from_query(req.query_string()) + .map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?; - let res = generate_home_response(&key).await?; + let include_internal = query_map + .get(HOME_QUERY_PARAM) + .map_or(false, |v| v == "true"); + + let res = generate_home_response(&key, include_internal).await?; Ok(web::Json(res)) } @@ -52,11 +58,12 @@ pub async fn home_search(req: HttpRequest) -> Result, schema: Arc, ) -> Result<(), PostError> { - let stats_dataset_name = format!("dataset_{INTERNAL_STREAM_NAME}"); let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); PARSEABLE .create_stream_if_not_exists( - &stats_dataset_name, + DATASET_STATS_STREAM_NAME, StreamType::Internal, vec![log_source_entry], ) @@ -1061,7 +1057,7 @@ impl Stream { flatten_and_push_logs( stats_value, - &stats_dataset_name, + DATASET_STATS_STREAM_NAME, &LogSource::Json, &HashMap::new(), ) @@ -1189,7 +1185,7 @@ impl Stream { field_name: &str, ) -> Vec { let sql = format!( - "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit {}", + "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}", PARSEABLE.options.max_field_statistics ); let mut distinct_stats = Vec::new(); @@ -1300,7 +1296,8 @@ impl Streams { .map(Arc::clone) .collect(); for stream in streams { - joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal).await }); + joinset + .spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal).await }); } } } diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 656e50cae..6313e79d6 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -33,7 +33,7 @@ use crate::{ handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError}, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, - storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, + storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, users::{dashboards::DASHBOARDS, filters::FILTERS}, }; @@ -88,7 +88,10 @@ pub struct HomeSearchResponse { resources: Vec, } -pub async fn generate_home_response(key: &SessionKey) -> Result { +pub async fn generate_home_response( + key: &SessionKey, + include_internal: bool, +) -> Result { // Execute these operations concurrently let (stream_titles_result, alerts_info_result) = tokio::join!(get_stream_titles(key), get_alerts_info()); @@ -120,6 +123,14 @@ pub async fn generate_home_response(key: &SessionKey) -> Result { + // Skip internal streams if the flag is false + if !include_internal + && metadata + .iter() + .any(|m| m.stream_type == StreamType::Internal) + { + continue; + } stream_wise_stream_json.insert(stream.clone(), metadata); datasets.push(DataSet { title: stream, From e2df03caa7dbd20bc917e0fc5ec797312319c341 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 03:07:26 -0400 Subject: [PATCH 06/20] move pstats name to utils --- src/parseable/streams.rs | 3 ++- src/utils/mod.rs | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index e8ff39fd0..71ac9a2cf 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -46,6 +46,7 @@ use serde::Serialize; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; +use crate::utils::DATASET_STATS_STREAM_NAME; use crate::{ cli::Options, event::{ @@ -71,7 +72,7 @@ use super::{ }, LogStream, ARROW_FILE_EXTENSION, }; -const DATASET_STATS_STREAM_NAME: &str = "pstats"; + const MAX_CONCURRENT_FIELD_STATS: usize = 10; #[derive(Serialize, Debug)] diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 010e9f594..4cd5cac3a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -37,6 +37,8 @@ use datafusion::common::tree_node::TreeNode; use regex::Regex; use sha2::{Digest, Sha256}; +pub const DATASET_STATS_STREAM_NAME: &str = "pstats"; + pub fn get_node_id() -> String { let now = Utc::now().to_rfc3339(); let id = get_hash(&now).to_string().split_at(15).0.to_string(); From 27566b7a40e949e489b7d419e23fd3b1a54092c0 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 03:09:41 -0400 Subject: [PATCH 07/20] home api query param public --- src/handlers/http/prism_home.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/prism_home.rs b/src/handlers/http/prism_home.rs index 7754d3afc..1b2575529 100644 --- a/src/handlers/http/prism_home.rs +++ b/src/handlers/http/prism_home.rs @@ -26,7 +26,7 @@ use crate::{ }; const HOME_SEARCH_QUERY_PARAM: &str = "key"; -const HOME_QUERY_PARAM: &str = "includeInternal"; +pub const HOME_QUERY_PARAM: &str = "includeInternal"; /// Fetches the data to populate Prism's home /// /// From b76a1ace3bedece2c56d3507d0cf39e6d9ad21fa Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 10:47:02 -0400 Subject: [PATCH 08/20] drop session context --- src/parseable/streams.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 71ac9a2cf..eae3e7f2e 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -1046,6 +1046,7 @@ impl Stream { .map_err(|e| PostError::Invalid(e.into()))?; let field_stats = self.collect_all_field_stats(&ctx, &schema).await; + drop(ctx); let stats = DatasetStats { dataset_name: self.stream_name.clone(), From 089d3c1bd7f10dddabbbf6a0acacc82236e995b4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 22:36:53 -0400 Subject: [PATCH 09/20] stats calculation in non blocking task --- src/parseable/streams.rs | 344 ++++++++++++++++++++------------------- 1 file changed, 175 insertions(+), 169 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index eae3e7f2e..5b397bf12 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -519,12 +519,19 @@ impl Stream { if let Some(mut schema) = schema { // calculate field stats for all user defined streams if self.get_stream_type() != StreamType::Internal { - if let Err(err) = self.calculate_field_stats(rbs, schema.clone().into()).await { - warn!( - "Error calculating field stats for stream {}: {}", - self.stream_name, err - ); - } + let stats_schema = schema.clone(); + let stream_name = self.stream_name.clone(); + let stats_rbs = rbs.clone(); + tokio::spawn(async move { + if let Err(err) = + calculate_field_stats(&stream_name, stats_rbs, stats_schema.into()).await + { + warn!( + "Error calculating field stats for stream {}: {}", + &stream_name, err + ); + } + }); } let static_schema_flag = self.get_static_schema_flag(); if !static_schema_flag { @@ -1022,196 +1029,195 @@ impl Stream { Ok(()) } +} - /// Calculates field statistics for the stream and pushes them to the internal stats dataset. - /// This function creates a new internal stream for stats if it doesn't exist. - /// It collects statistics for each field in the stream - async fn calculate_field_stats( - &self, - record_batches: Vec, - schema: Arc, - ) -> Result<(), PostError> { - let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); - PARSEABLE - .create_stream_if_not_exists( - DATASET_STATS_STREAM_NAME, - StreamType::Internal, - vec![log_source_entry], - ) - .await?; - let mem_table = MemTable::try_new(schema.clone(), vec![record_batches]) - .map_err(|e| PostError::Invalid(e.into()))?; - let ctx = SessionContext::new(); - ctx.register_table(&self.stream_name, Arc::new(mem_table)) - .map_err(|e| PostError::Invalid(e.into()))?; - - let field_stats = self.collect_all_field_stats(&ctx, &schema).await; - drop(ctx); - - let stats = DatasetStats { - dataset_name: self.stream_name.clone(), - field_stats, - }; - if stats.field_stats.is_empty() { - return Ok(()); - } - let stats_value = serde_json::to_value(&stats).map_err(|e| PostError::Invalid(e.into()))?; - - flatten_and_push_logs( - stats_value, +/// Calculates field statistics for the stream and pushes them to the internal stats dataset. +/// This function creates a new internal stream for stats if it doesn't exist. +/// It collects statistics for each field in the stream +async fn calculate_field_stats( + stream_name: &str, + record_batches: Vec, + schema: Arc, +) -> Result<(), PostError> { + let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); + PARSEABLE + .create_stream_if_not_exists( DATASET_STATS_STREAM_NAME, - &LogSource::Json, - &HashMap::new(), + StreamType::Internal, + vec![log_source_entry], ) .await?; - Ok(()) - } - - /// Collects statistics for all fields in the stream. - /// Returns a vector of `FieldStat` for each field with non-zero count. - /// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently. - async fn collect_all_field_stats( - &self, - ctx: &SessionContext, - schema: &Arc, - ) -> Vec { - // Collect field names into an owned Vec to avoid lifetime issues - let field_names: Vec = schema - .fields() - .iter() - .map(|field| field.name().clone()) - .collect(); - - let field_futures = field_names.into_iter().map(|field_name| { - let ctx = ctx.clone(); - let stream_name = self.stream_name.clone(); - async move { Self::calculate_single_field_stats(ctx, stream_name, field_name).await } - }); + let mem_table = MemTable::try_new(schema.clone(), vec![record_batches]) + .map_err(|e| PostError::Invalid(e.into()))?; + let ctx = SessionContext::new(); + ctx.register_table(stream_name, Arc::new(mem_table)) + .map_err(|e| PostError::Invalid(e.into()))?; + + let field_stats = collect_all_field_stats(stream_name, &ctx, &schema).await; + drop(ctx); + + let stats = DatasetStats { + dataset_name: stream_name.to_string(), + field_stats, + }; + if stats.field_stats.is_empty() { + return Ok(()); + } + let stats_value = serde_json::to_value(&stats).map_err(|e| PostError::Invalid(e.into()))?; + + flatten_and_push_logs( + stats_value, + DATASET_STATS_STREAM_NAME, + &LogSource::Json, + &HashMap::new(), + ) + .await?; + Ok(()) +} - futures::stream::iter(field_futures) - .buffer_unordered(MAX_CONCURRENT_FIELD_STATS) - .filter_map(std::future::ready) - .collect::>() - .await - } +/// Collects statistics for all fields in the stream. +/// Returns a vector of `FieldStat` for each field with non-zero count. +/// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently. +async fn collect_all_field_stats( + stream_name: &str, + ctx: &SessionContext, + schema: &Arc, +) -> Vec { + // Collect field names into an owned Vec to avoid lifetime issues + let field_names: Vec = schema + .fields() + .iter() + .map(|field| field.name().clone()) + .collect(); + + let field_futures = field_names.into_iter().map(|field_name| { + let ctx = ctx.clone(); + async move { calculate_single_field_stats(ctx, stream_name, &field_name).await } + }); + + futures::stream::iter(field_futures) + .buffer_unordered(MAX_CONCURRENT_FIELD_STATS) + .filter_map(std::future::ready) + .collect::>() + .await +} - /// Calculates statistics for a single field in the stream. - /// Returns `None` if the count query returns 0. - async fn calculate_single_field_stats( - ctx: SessionContext, - stream_name: String, - field_name: String, - ) -> Option { - let count = Self::query_single_i64( +/// Calculates statistics for a single field in the stream. +/// Returns `None` if the count query returns 0. +async fn calculate_single_field_stats( + ctx: SessionContext, + stream_name: &str, + field_name: &str, +) -> Option { + let count = query_single_i64( &ctx, &format!( "select count(\"{field_name}\") as count from \"{stream_name}\" where \"{field_name}\" is not null" ), ) .await?; - if count == 0 { - return None; - } - - let distinct_count = Self::query_single_i64( - &ctx, - &format!( - "select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\"" - ), - ) - .await?; + if count == 0 { + return None; + } - let distinct_stats = Self::query_distinct_stats(&ctx, &stream_name, &field_name).await; + let distinct_count = query_single_i64( + &ctx, + &format!( + "select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\"" + ), + ) + .await?; - Some(FieldStat { - field_name, - count, - distinct_count, - distinct_stats, - }) - } + let distinct_stats = query_distinct_stats(&ctx, stream_name, field_name).await; - /// Queries a single integer value from the DataFusion context. - /// Returns `None` if the query fails or returns no rows. - /// This is used for fetching record count for a field and distinct count. - async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { - let df = ctx.sql(sql).await.ok()?; - let batches = df.collect().await.ok()?; - let batch = batches.first()?; - if batch.num_rows() == 0 { - return None; - } - let array = batch.column(0).as_any().downcast_ref::()?; + Some(FieldStat { + field_name: field_name.to_string(), + count, + distinct_count, + distinct_stats, + }) +} - Some(array.value(0)) +/// Queries a single integer value from the DataFusion context. +/// Returns `None` if the query fails or returns no rows. +/// This is used for fetching record count for a field and distinct count. +async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { + let df = ctx.sql(sql).await.ok()?; + let batches = df.collect().await.ok()?; + let batch = batches.first()?; + if batch.num_rows() == 0 { + return None; } + let array = batch.column(0).as_any().downcast_ref::()?; - /// Helper function to format an Arrow value at a given index into a string. - /// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean. - fn format_arrow_value(array: &dyn Array, idx: usize) -> String { - if array.is_null(idx) { - return "NULL".to_string(); - } - if let Some(arr) = array.as_any().downcast_ref::() { - arr.value(idx).to_string() - } else if let Some(arr) = array.as_any().downcast_ref::() { - arr.value(idx).to_string() - } else if let Some(arr) = array.as_any().downcast_ref::() { - arr.value(idx).to_string() - } else if let Some(arr) = array.as_any().downcast_ref::() { - let timestamp = arr.value(idx); - DateTime::from_timestamp_millis(timestamp) - .map(|dt| dt.to_string()) - .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) - } else if let Some(arr) = array.as_any().downcast_ref::() { - return arr.value(idx).to_string(); - } else if let Some(arr) = array.as_any().downcast_ref::() { - arr.value(idx).to_string() - } else if array.as_any().downcast_ref::().is_some() { - "NULL".to_string() - } else { - warn!( - "Unsupported array type for statistics: {:?}", - array.data_type() - ); - "UNSUPPORTED".to_string() - } + Some(array.value(0)) +} + +/// Helper function to format an Arrow value at a given index into a string. +/// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean. +fn format_arrow_value(array: &dyn Array, idx: usize) -> String { + if array.is_null(idx) { + return "NULL".to_string(); + } + if let Some(arr) = array.as_any().downcast_ref::() { + arr.value(idx).to_string() + } else if let Some(arr) = array.as_any().downcast_ref::() { + arr.value(idx).to_string() + } else if let Some(arr) = array.as_any().downcast_ref::() { + arr.value(idx).to_string() + } else if let Some(arr) = array.as_any().downcast_ref::() { + let timestamp = arr.value(idx); + DateTime::from_timestamp_millis(timestamp) + .map(|dt| dt.to_string()) + .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) + } else if let Some(arr) = array.as_any().downcast_ref::() { + return arr.value(idx).to_string(); + } else if let Some(arr) = array.as_any().downcast_ref::() { + arr.value(idx).to_string() + } else if array.as_any().downcast_ref::().is_some() { + "NULL".to_string() + } else { + warn!( + "Unsupported array type for statistics: {:?}", + array.data_type() + ); + "UNSUPPORTED".to_string() } +} - /// This function is used to fetch distinct values and their counts for a field in the stream. - /// Returns a vector of `DistinctStat` containing distinct values and their counts. - /// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`. - async fn query_distinct_stats( - ctx: &SessionContext, - stream_name: &str, - field_name: &str, - ) -> Vec { - let sql = format!( +/// This function is used to fetch distinct values and their counts for a field in the stream. +/// Returns a vector of `DistinctStat` containing distinct values and their counts. +/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`. +async fn query_distinct_stats( + ctx: &SessionContext, + stream_name: &str, + field_name: &str, +) -> Vec { + let sql = format!( "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}", PARSEABLE.options.max_field_statistics ); - let mut distinct_stats = Vec::new(); - if let Ok(df) = ctx.sql(&sql).await { - if let Ok(batches) = df.collect().await { - for rb in batches { - let counts = rb - .column(0) - .as_any() - .downcast_ref::() - .expect("Counts should be Int64Array"); - let values = rb.column(1).as_ref(); - for i in 0..rb.num_rows() { - let value = Self::format_arrow_value(values, i); - distinct_stats.push(DistinctStat { - distinct_value: value, - count: counts.value(i), - }); - } + let mut distinct_stats = Vec::new(); + if let Ok(df) = ctx.sql(&sql).await { + if let Ok(batches) = df.collect().await { + for rb in batches { + let counts = rb + .column(0) + .as_any() + .downcast_ref::() + .expect("Counts should be Int64Array"); + let values = rb.column(1).as_ref(); + for i in 0..rb.num_rows() { + let value = format_arrow_value(values, i); + distinct_stats.push(DistinctStat { + distinct_value: value, + count: counts.value(i), + }); } } } - distinct_stats } + distinct_stats } #[derive(Deref, DerefMut, Default)] From d78169f89b7eaf6a943f2eadbaa64cd81f193fd1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 22:45:54 -0400 Subject: [PATCH 10/20] update query --- src/parseable/streams.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 5b397bf12..602582872 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -1110,12 +1110,10 @@ async fn calculate_single_field_stats( field_name: &str, ) -> Option { let count = query_single_i64( - &ctx, - &format!( - "select count(\"{field_name}\") as count from \"{stream_name}\" where \"{field_name}\" is not null" - ), - ) - .await?; + &ctx, + &format!("select count(\"{field_name}\") as count from \"{stream_name}\""), + ) + .await?; if count == 0 { return None; } From 938d3e22c2d2b22618b19771bc060294946eb0a0 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 17 Jun 2025 07:28:42 -0400 Subject: [PATCH 11/20] env to collect stats, refactor --- src/cli.rs | 9 +++++++++ src/handlers/http/prism_home.rs | 4 +--- src/parseable/streams.rs | 13 +++++++------ src/prism/home/mod.rs | 2 +- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 68df00dc2..3eef23fc1 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -397,6 +397,15 @@ pub struct Options { help = "Maximum number of field statistics to store" )] pub max_field_statistics: usize, + + // collect dataset stats + #[arg( + long, + env = "P_COLLECT_DATASET_STATS", + default_value = "false", + help = "Enable/Disable collecting dataset stats" + )] + pub collect_dataset_stats: bool, } #[derive(Parser, Debug)] diff --git a/src/handlers/http/prism_home.rs b/src/handlers/http/prism_home.rs index 1b2575529..e83063685 100644 --- a/src/handlers/http/prism_home.rs +++ b/src/handlers/http/prism_home.rs @@ -39,9 +39,7 @@ pub async fn home_api(req: HttpRequest) -> Result>::from_query(req.query_string()) .map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?; - let include_internal = query_map - .get(HOME_QUERY_PARAM) - .map_or(false, |v| v == "true"); + let include_internal = query_map.get(HOME_QUERY_PARAM).is_some_and(|v| v == "true"); let res = generate_home_response(&key, include_internal).await?; diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 602582872..bc8139cfc 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -518,7 +518,9 @@ impl Stream { if let Some(mut schema) = schema { // calculate field stats for all user defined streams - if self.get_stream_type() != StreamType::Internal { + if self.get_stream_type() != StreamType::Internal + && PARSEABLE.options.collect_dataset_stats + { let stats_schema = schema.clone(); let stream_name = self.stream_name.clone(); let stats_rbs = rbs.clone(); @@ -1199,11 +1201,10 @@ async fn query_distinct_stats( if let Ok(df) = ctx.sql(&sql).await { if let Ok(batches) = df.collect().await { for rb in batches { - let counts = rb - .column(0) - .as_any() - .downcast_ref::() - .expect("Counts should be Int64Array"); + let Some(counts) = rb.column(0).as_any().downcast_ref::() else { + warn!("Unexpected type for count column in stats query"); + continue; + }; let values = rb.column(1).as_ref(); for i in 0..rb.num_rows() { let value = format_arrow_value(values, i); diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 6313e79d6..397fed2c1 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -127,7 +127,7 @@ pub async fn generate_home_response( if !include_internal && metadata .iter() - .any(|m| m.stream_type == StreamType::Internal) + .all(|m| m.stream_type == StreamType::Internal) { continue; } From ef085642d7e45364d3c2ee8fa46e4f5d1e35872d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Jun 2025 02:08:48 -0400 Subject: [PATCH 12/20] moved stats calculation to object store sync --- src/handlers/http/ingest.rs | 4 + src/lib.rs | 6 + src/parseable/mod.rs | 4 +- src/parseable/streams.rs | 263 ++----------------------------- src/query/mod.rs | 44 +++--- src/storage/object_storage.rs | 287 ++++++++++++++++++++++++++++++++++ src/sync.rs | 12 +- 7 files changed, 347 insertions(+), 273 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index c23eb6659..34b832f6a 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -100,6 +100,7 @@ pub async fn ingest( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, + None, vec![log_source_entry.clone()], ) .await?; @@ -183,6 +184,7 @@ pub async fn handle_otel_logs_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, + None, vec![log_source_entry.clone()], ) .await?; @@ -248,6 +250,7 @@ pub async fn handle_otel_metrics_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, + None, vec![log_source_entry.clone()], ) .await?; @@ -313,6 +316,7 @@ pub async fn handle_otel_traces_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, + None, vec![log_source_entry.clone()], ) .await?; diff --git a/src/lib.rs b/src/lib.rs index 8372a2e3b..52ef6cc6f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,6 +72,12 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as /// Describes the duration at the end of which parquets are pushed into objectstore. pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30); +/// Describes the duration during which local sync should be completed +pub const LOCAL_SYNC_THRESHOLD: Duration = Duration::from_secs(30); // 30 secs + +/// Describes the duration during which object store sync should be completed +pub const OBJECT_STORE_SYNC_THRESHOLD: Duration = Duration::from_secs(15); // 15 secs + // A single HTTP client for all outgoing HTTP requests from the parseable server pub static HTTP_CLIENT: Lazy = Lazy::new(|| { ClientBuilder::new() diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 83197e42e..4b1c896cc 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -330,6 +330,7 @@ impl Parseable { .create_stream_if_not_exists( INTERNAL_STREAM_NAME, StreamType::Internal, + None, vec![log_source_entry], ) .await @@ -354,6 +355,7 @@ impl Parseable { &self, stream_name: &str, stream_type: StreamType, + custom_partition: Option<&String>, log_source: Vec, ) -> Result { if self.streams.contains(stream_name) { @@ -375,7 +377,7 @@ impl Parseable { stream_name.to_string(), "", None, - None, + custom_partition, false, Arc::new(Schema::empty()), stream_type, diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index bc8139cfc..e9b9fbef9 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -26,13 +26,10 @@ use std::{ time::{Instant, SystemTime, UNIX_EPOCH}, }; -use arrow_array::{Array, Date32Array, Float64Array, Int64Array, NullArray, StringArray}; -use arrow_array::{BooleanArray, RecordBatch, TimestampMillisecondArray}; +use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; -use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; -use datafusion::{datasource::MemTable, prelude::SessionContext}; -use derive_more::{Deref, DerefMut}; -use futures_util::StreamExt; +use chrono::{NaiveDateTime, Timelike, Utc}; +use derive_more::derive::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ arrow::ArrowWriter, @@ -42,11 +39,9 @@ use parquet::{ schema::types::ColumnPath, }; use relative_path::RelativePathBuf; -use serde::Serialize; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; -use crate::utils::DATASET_STATS_STREAM_NAME; use crate::{ cli::Options, event::{ @@ -54,11 +49,9 @@ use crate::{ DEFAULT_TIMESTAMP_KEY, }, handlers::http::modal::{ingest_server::INGESTOR_META, query_server::QUERIER_META}, - handlers::http::{ingest::PostError, modal::utils::ingest_utils::flatten_and_push_logs}, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, - parseable::PARSEABLE, storage::{object_storage::to_bytes, retention::Retention, StreamType}, utils::time::{Minute, TimeRange}, LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, @@ -73,28 +66,6 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; -const MAX_CONCURRENT_FIELD_STATS: usize = 10; - -#[derive(Serialize, Debug)] -struct DistinctStat { - distinct_value: String, - count: i64, -} - -#[derive(Serialize, Debug)] -struct FieldStat { - field_name: String, - count: i64, - distinct_count: i64, - distinct_stats: Vec, -} - -#[derive(Serialize, Debug)] -struct DatasetStats { - dataset_name: String, - field_stats: Vec, -} - const INPROCESS_DIR_PREFIX: &str = "processing_"; /// Returns the filename for parquet if provided arrows file path is valid as per our expectation @@ -507,7 +478,7 @@ impl Stream { // read arrow files on disk // convert them to parquet - let (schema, rbs) = self.convert_disk_files_to_parquet( + let schema = self.convert_disk_files_to_parquet( time_partition.as_ref(), custom_partition.as_ref(), init_signal, @@ -517,24 +488,6 @@ impl Stream { // if yes, then merge them and save if let Some(mut schema) = schema { - // calculate field stats for all user defined streams - if self.get_stream_type() != StreamType::Internal - && PARSEABLE.options.collect_dataset_stats - { - let stats_schema = schema.clone(); - let stream_name = self.stream_name.clone(); - let stats_rbs = rbs.clone(); - tokio::spawn(async move { - if let Err(err) = - calculate_field_stats(&stream_name, stats_rbs, stats_schema.into()).await - { - warn!( - "Error calculating field stats for stream {}: {}", - &stream_name, err - ); - } - }); - } let static_schema_flag = self.get_static_schema_flag(); if !static_schema_flag { // schema is dynamic, read from staging and merge if present @@ -671,7 +624,7 @@ impl Stream { custom_partition: Option<&String>, init_signal: bool, shutdown_signal: bool, - ) -> Result<(Option, Vec), StagingError> { + ) -> Result, StagingError> { let mut schemas = Vec::new(); let now = SystemTime::now(); @@ -680,7 +633,7 @@ impl Stream { self.arrow_files_grouped_exclude_time(now, group_minute, init_signal, shutdown_signal); if staging_files.is_empty() { self.reset_staging_metrics(); - return Ok((None, Vec::new())); + return Ok(None); } self.update_staging_metrics(&staging_files); @@ -716,10 +669,10 @@ impl Stream { } if schemas.is_empty() { - return Ok((None, Vec::new())); + return Ok(None); } - Ok((Some(Schema::try_merge(schemas).unwrap()), record_batches)) + Ok(Some(Schema::try_merge(schemas).unwrap())) } fn write_parquet_part_file( @@ -1033,192 +986,6 @@ impl Stream { } } -/// Calculates field statistics for the stream and pushes them to the internal stats dataset. -/// This function creates a new internal stream for stats if it doesn't exist. -/// It collects statistics for each field in the stream -async fn calculate_field_stats( - stream_name: &str, - record_batches: Vec, - schema: Arc, -) -> Result<(), PostError> { - let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); - PARSEABLE - .create_stream_if_not_exists( - DATASET_STATS_STREAM_NAME, - StreamType::Internal, - vec![log_source_entry], - ) - .await?; - let mem_table = MemTable::try_new(schema.clone(), vec![record_batches]) - .map_err(|e| PostError::Invalid(e.into()))?; - let ctx = SessionContext::new(); - ctx.register_table(stream_name, Arc::new(mem_table)) - .map_err(|e| PostError::Invalid(e.into()))?; - - let field_stats = collect_all_field_stats(stream_name, &ctx, &schema).await; - drop(ctx); - - let stats = DatasetStats { - dataset_name: stream_name.to_string(), - field_stats, - }; - if stats.field_stats.is_empty() { - return Ok(()); - } - let stats_value = serde_json::to_value(&stats).map_err(|e| PostError::Invalid(e.into()))?; - - flatten_and_push_logs( - stats_value, - DATASET_STATS_STREAM_NAME, - &LogSource::Json, - &HashMap::new(), - ) - .await?; - Ok(()) -} - -/// Collects statistics for all fields in the stream. -/// Returns a vector of `FieldStat` for each field with non-zero count. -/// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently. -async fn collect_all_field_stats( - stream_name: &str, - ctx: &SessionContext, - schema: &Arc, -) -> Vec { - // Collect field names into an owned Vec to avoid lifetime issues - let field_names: Vec = schema - .fields() - .iter() - .map(|field| field.name().clone()) - .collect(); - - let field_futures = field_names.into_iter().map(|field_name| { - let ctx = ctx.clone(); - async move { calculate_single_field_stats(ctx, stream_name, &field_name).await } - }); - - futures::stream::iter(field_futures) - .buffer_unordered(MAX_CONCURRENT_FIELD_STATS) - .filter_map(std::future::ready) - .collect::>() - .await -} - -/// Calculates statistics for a single field in the stream. -/// Returns `None` if the count query returns 0. -async fn calculate_single_field_stats( - ctx: SessionContext, - stream_name: &str, - field_name: &str, -) -> Option { - let count = query_single_i64( - &ctx, - &format!("select count(\"{field_name}\") as count from \"{stream_name}\""), - ) - .await?; - if count == 0 { - return None; - } - - let distinct_count = query_single_i64( - &ctx, - &format!( - "select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\"" - ), - ) - .await?; - - let distinct_stats = query_distinct_stats(&ctx, stream_name, field_name).await; - - Some(FieldStat { - field_name: field_name.to_string(), - count, - distinct_count, - distinct_stats, - }) -} - -/// Queries a single integer value from the DataFusion context. -/// Returns `None` if the query fails or returns no rows. -/// This is used for fetching record count for a field and distinct count. -async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { - let df = ctx.sql(sql).await.ok()?; - let batches = df.collect().await.ok()?; - let batch = batches.first()?; - if batch.num_rows() == 0 { - return None; - } - let array = batch.column(0).as_any().downcast_ref::()?; - - Some(array.value(0)) -} - -/// Helper function to format an Arrow value at a given index into a string. -/// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean. -fn format_arrow_value(array: &dyn Array, idx: usize) -> String { - if array.is_null(idx) { - return "NULL".to_string(); - } - if let Some(arr) = array.as_any().downcast_ref::() { - arr.value(idx).to_string() - } else if let Some(arr) = array.as_any().downcast_ref::() { - arr.value(idx).to_string() - } else if let Some(arr) = array.as_any().downcast_ref::() { - arr.value(idx).to_string() - } else if let Some(arr) = array.as_any().downcast_ref::() { - let timestamp = arr.value(idx); - DateTime::from_timestamp_millis(timestamp) - .map(|dt| dt.to_string()) - .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) - } else if let Some(arr) = array.as_any().downcast_ref::() { - return arr.value(idx).to_string(); - } else if let Some(arr) = array.as_any().downcast_ref::() { - arr.value(idx).to_string() - } else if array.as_any().downcast_ref::().is_some() { - "NULL".to_string() - } else { - warn!( - "Unsupported array type for statistics: {:?}", - array.data_type() - ); - "UNSUPPORTED".to_string() - } -} - -/// This function is used to fetch distinct values and their counts for a field in the stream. -/// Returns a vector of `DistinctStat` containing distinct values and their counts. -/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`. -async fn query_distinct_stats( - ctx: &SessionContext, - stream_name: &str, - field_name: &str, -) -> Vec { - let sql = format!( - "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}", - PARSEABLE.options.max_field_statistics - ); - let mut distinct_stats = Vec::new(); - if let Ok(df) = ctx.sql(&sql).await { - if let Ok(batches) = df.collect().await { - for rb in batches { - let Some(counts) = rb.column(0).as_any().downcast_ref::() else { - warn!("Unexpected type for count column in stats query"); - continue; - }; - let values = rb.column(1).as_ref(); - for i in 0..rb.num_rows() { - let value = format_arrow_value(values, i); - distinct_stats.push(DistinctStat { - distinct_value: value, - count: counts.value(i), - }); - } - } - } - } - distinct_stats -} - #[derive(Deref, DerefMut, Default)] pub struct Streams(RwLock>); @@ -1494,7 +1261,7 @@ mod tests { None, ) .convert_disk_files_to_parquet(None, None, false, false)?; - assert!(result.0.is_none()); + assert!(result.is_none()); // Verify metrics were set to 0 let staging_files = metrics::STAGING_FILES.with_label_values(&[&stream]).get(); assert_eq!(staging_files, 0); @@ -1575,8 +1342,8 @@ mod tests { .convert_disk_files_to_parquet(None, None, false, true) .unwrap(); - assert!(result.0.is_some()); - let result_schema = result.0.unwrap(); + assert!(result.is_some()); + let result_schema = result.unwrap(); assert_eq!(result_schema.fields().len(), 3); // Verify parquet files were created and the arrow files deleted @@ -1624,8 +1391,8 @@ mod tests { .convert_disk_files_to_parquet(None, None, false, true) .unwrap(); - assert!(result.0.is_some()); - let result_schema = result.0.unwrap(); + assert!(result.is_some()); + let result_schema = result.unwrap(); assert_eq!(result_schema.fields().len(), 3); // Verify parquet files were created and the arrow files deleted @@ -1678,8 +1445,8 @@ mod tests { .convert_disk_files_to_parquet(None, None, false, false) .unwrap(); - assert!(result.0.is_some()); - let result_schema = result.0.unwrap(); + assert!(result.is_some()); + let result_schema = result.unwrap(); assert_eq!(result_schema.fields().len(), 3); // Verify parquet files were created and the arrow file left diff --git a/src/query/mod.rs b/src/query/mod.rs index e9c5632e7..c9410eb2a 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; -use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder}; +use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder}; use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::{ Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, @@ -61,6 +61,9 @@ use crate::utils::time::TimeRange; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); +pub static QUERY_SESSION_STATE: Lazy = + Lazy::new(|| Query::create_session_state(PARSEABLE.storage())); + /// Dedicated multi-threaded runtime to run all queries on pub static QUERY_RUNTIME: Lazy = Lazy::new(|| Runtime::new().expect("Runtime should be constructible")); @@ -96,10 +99,28 @@ pub struct Query { impl Query { // create session context for this query pub fn create_session_context(storage: Arc) -> SessionContext { + let state = Self::create_session_state(storage.clone()); + + let schema_provider = Arc::new(GlobalSchemaProvider { + storage: storage.get_object_store(), + }); + state + .catalog_list() + .catalog(&state.config_options().catalog.default_catalog) + .expect("default catalog is provided by datafusion") + .register_schema( + &state.config_options().catalog.default_schema, + schema_provider, + ) + .unwrap(); + + SessionContext::new_with_state(state) + } + + fn create_session_state(storage: Arc) -> SessionState { let runtime_config = storage .get_datafusion_runtime() .with_disk_manager(DiskManagerConfig::NewOs); - let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size { Some(size) => (size, 1.), None => { @@ -142,26 +163,11 @@ impl Query { .parquet .schema_force_view_types = true; - let state = SessionStateBuilder::new() + SessionStateBuilder::new() .with_default_features() .with_config(config) .with_runtime_env(runtime) - .build(); - - let schema_provider = Arc::new(GlobalSchemaProvider { - storage: storage.get_object_store(), - }); - state - .catalog_list() - .catalog(&state.config_options().catalog.default_catalog) - .expect("default catalog is provided by datafusion") - .register_schema( - &state.config_options().catalog.default_schema, - schema_provider, - ) - .unwrap(); - - SessionContext::new_with_state(state) + .build() } /// this function returns the result of the query diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 00e262631..1832d5433 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -28,16 +28,32 @@ use std::time::Duration; use std::time::Instant; use actix_web_prometheus::PrometheusMetrics; +use arrow_array::Array; +use arrow_array::BinaryArray; +use arrow_array::BinaryViewArray; +use arrow_array::BooleanArray; +use arrow_array::Date32Array; +use arrow_array::Float64Array; +use arrow_array::Int64Array; +use arrow_array::StringArray; +use arrow_array::StringViewArray; +use arrow_array::TimestampMillisecondArray; +use arrow_schema::DataType; use arrow_schema::Schema; +use arrow_schema::TimeUnit; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; +use datafusion::prelude::ParquetReadOptions; +use datafusion::prelude::SessionContext; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; +use futures::StreamExt; use object_store::buffered::BufReader; use object_store::ObjectMeta; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; +use serde::Serialize; use tokio::task::JoinSet; use tracing::info; use tracing::{error, warn}; @@ -48,8 +64,10 @@ use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; use crate::event::format::LogSourceEntry; +use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::handlers::http::modal::utils::ingest_utils::flatten_and_push_logs; use crate::handlers::http::users::CORRELATION_DIR; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::storage::StorageMetrics; @@ -57,7 +75,10 @@ use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STO use crate::option::Mode; use crate::parseable::LogStream; use crate::parseable::PARSEABLE; +use crate::query::QUERY_SESSION_STATE; use crate::stats::FullStats; +use crate::storage::StreamType; +use crate::utils::DATASET_STATS_STREAM_NAME; use super::{ retention::Retention, ObjectStorageError, ObjectStoreFormat, StorageMetadata, @@ -807,7 +828,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let stream = PARSEABLE.get_or_create_stream(stream_name); let custom_partition = stream.get_custom_partition(); + let schema = stream.get_schema(); for path in stream.parquet_files() { + if stream.get_stream_type() != StreamType::Internal + && PARSEABLE.options.collect_dataset_stats + { + if let Err(err) = calculate_field_stats(stream_name, &path, &schema).await { + warn!( + "Error calculating field stats for stream {}: {}", + stream_name, err + ); + } + } let filename = path .file_name() .expect("only parquet files are returned by iterator") @@ -895,6 +927,261 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { }); } } + +const MAX_CONCURRENT_FIELD_STATS: usize = 10; + +#[derive(Serialize, Debug)] +struct DistinctStat { + distinct_value: String, + count: i64, +} + +#[derive(Serialize, Debug)] +struct FieldStat { + field_name: String, + count: i64, + distinct_count: i64, + distinct_stats: Vec, +} + +#[derive(Serialize, Debug)] +struct DatasetStats { + dataset_name: String, + field_stats: Vec, +} + +/// Calculates field statistics for the stream and pushes them to the internal stats dataset. +/// This function creates a new internal stream for stats if it doesn't exist. +/// It collects statistics for each field in the stream +async fn calculate_field_stats( + stream_name: &str, + parquet_path: &Path, + schema: &Schema, +) -> Result<(), PostError> { + let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); + PARSEABLE + .create_stream_if_not_exists( + DATASET_STATS_STREAM_NAME, + StreamType::Internal, + Some(&"dataset_name".to_string()), + vec![log_source_entry], + ) + .await?; + let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone()); + let ctx_table_name = format!("{}_{}", stream_name, parquet_path.display()); + ctx.register_parquet( + &ctx_table_name, + parquet_path.to_str().expect("valid path"), + ParquetReadOptions::default(), + ) + .await + .map_err(|e| PostError::Invalid(e.into()))?; + let field_stats = collect_all_field_stats(&ctx_table_name, &ctx, schema).await; + drop(ctx); + + let stats = DatasetStats { + dataset_name: stream_name.to_string(), + field_stats, + }; + if stats.field_stats.is_empty() { + return Ok(()); + } + let stats_value = + serde_json::to_value(&stats).map_err(|e| ObjectStorageError::Invalid(e.into()))?; + + flatten_and_push_logs( + stats_value, + DATASET_STATS_STREAM_NAME, + &LogSource::Json, + &HashMap::new(), + ) + .await?; + Ok(()) +} + +/// Collects statistics for all fields in the stream. +/// Returns a vector of `FieldStat` for each field with non-zero count. +/// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently. +async fn collect_all_field_stats( + stream_name: &str, + ctx: &SessionContext, + schema: &Schema, +) -> Vec { + // Collect field names into an owned Vec to avoid lifetime issues + let field_names: Vec = schema + .fields() + .iter() + .map(|field| field.name().clone()) + .collect(); + let field_futures = field_names.into_iter().map(|field_name| { + let ctx = ctx.clone(); + async move { calculate_single_field_stats(ctx, stream_name, &field_name).await } + }); + + futures::stream::iter(field_futures) + .buffer_unordered(MAX_CONCURRENT_FIELD_STATS) + .filter_map(std::future::ready) + .collect::>() + .await +} + +/// Calculates statistics for a single field in the stream. +/// Returns `None` if the count query returns 0. +async fn calculate_single_field_stats( + ctx: SessionContext, + stream_name: &str, + field_name: &str, +) -> Option { + let count = query_single_i64( + &ctx, + &format!("select count(\"{field_name}\") as count from \"{stream_name}\""), + ) + .await?; + if count == 0 { + return None; + } + + let distinct_count = query_single_i64( + &ctx, + &format!( + "select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\"" + ), + ) + .await?; + let distinct_stats = query_distinct_stats(&ctx, stream_name, field_name).await; + Some(FieldStat { + field_name: field_name.to_string(), + count, + distinct_count, + distinct_stats, + }) +} + +/// Queries a single integer value from the DataFusion context. +/// Returns `None` if the query fails or returns no rows. +/// This is used for fetching record count for a field and distinct count. +async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { + let df = ctx.sql(sql).await.ok()?; + let mut stream = df.execute_stream().await.ok()?; + let mut count = 0; + while let Some(batch_result) = stream.next().await { + let batch = batch_result.ok()?; + if batch.num_rows() == 0 { + return None; + } + let array = batch.column(0).as_any().downcast_ref::()?; + count += array.value(0); + } + Some(count) +} + +/// Helper function to format an Arrow value at a given index into a string. +/// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean. +fn format_arrow_value(array: &dyn Array, idx: usize) -> String { + if array.is_null(idx) { + return "NULL".to_string(); + } + + match array.data_type() { + DataType::Utf8 => array + .as_any() + .downcast_ref::() + .unwrap() + .value(idx) + .to_string(), + DataType::Utf8View => array + .as_any() + .downcast_ref::() + .unwrap() + .value(idx) + .to_string(), + DataType::Binary => { + let arr = array.as_any().downcast_ref::().unwrap(); + String::from_utf8_lossy(arr.value(idx)).to_string() + } + DataType::BinaryView => { + let arr = array.as_any().downcast_ref::().unwrap(); + String::from_utf8_lossy(arr.value(idx)).to_string() + } + DataType::Int64 => array + .as_any() + .downcast_ref::() + .unwrap() + .value(idx) + .to_string(), + DataType::Float64 => array + .as_any() + .downcast_ref::() + .unwrap() + .value(idx) + .to_string(), + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let arr = array + .as_any() + .downcast_ref::() + .unwrap(); + let timestamp = arr.value(idx); + DateTime::from_timestamp_millis(timestamp) + .map(|dt| dt.to_string()) + .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) + } + DataType::Date32 => array + .as_any() + .downcast_ref::() + .unwrap() + .value(idx) + .to_string(), + DataType::Boolean => array + .as_any() + .downcast_ref::() + .unwrap() + .value(idx) + .to_string(), + DataType::Null => "NULL".to_string(), + _ => { + warn!( + "Unsupported array type for statistics: {:?}", + array.data_type() + ); + "UNSUPPORTED".to_string() + } + } +} + +/// This function is used to fetch distinct values and their counts for a field in the stream. +/// Returns a vector of `DistinctStat` containing distinct values and their counts. +/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`. +async fn query_distinct_stats( + ctx: &SessionContext, + stream_name: &str, + field_name: &str, +) -> Vec { + let sql = format!( + "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}", + PARSEABLE.options.max_field_statistics + ); + let mut distinct_stats = Vec::new(); + if let Ok(df) = ctx.sql(&sql).await { + if let Ok(batches) = df.collect().await { + for rb in batches { + let Some(counts) = rb.column(0).as_any().downcast_ref::() else { + warn!("Unexpected type for count column in stats query"); + continue; + }; + let values = rb.column(1).as_ref(); + for i in 0..rb.num_rows() { + let value = format_arrow_value(values, i); + distinct_stats.push(DistinctStat { + distinct_value: value, + count: counts.value(i), + }); + } + } + } + } + distinct_stats +} + pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema, diff --git a/src/sync.rs b/src/sync.rs index 3f6fd5922..bad79abcf 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -30,7 +30,9 @@ use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertTask}; use crate::parseable::PARSEABLE; use crate::storage::object_storage::sync_all_streams; -use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; +use crate::{ + LOCAL_SYNC_INTERVAL, LOCAL_SYNC_THRESHOLD, OBJECT_STORE_SYNC_THRESHOLD, STORAGE_UPLOAD_INTERVAL, +}; // Calculates the instant that is the start of the next minute fn next_minute() -> Instant { @@ -131,7 +133,7 @@ pub fn object_store_sync() -> ( // Monitor the duration of sync_all_streams execution monitor_task_duration( "object_store_sync_all_streams", - Duration::from_secs(15), + OBJECT_STORE_SYNC_THRESHOLD, || async { let mut joinset = JoinSet::new(); sync_all_streams(&mut joinset); @@ -196,7 +198,7 @@ pub fn local_sync() -> ( // Monitor the duration of flush_and_convert execution monitor_task_duration( "local_sync_flush_and_convert", - Duration::from_secs(15), + LOCAL_SYNC_THRESHOLD, || async { let mut joinset = JoinSet::new(); PARSEABLE.streams.flush_and_convert(&mut joinset, false, false); @@ -242,7 +244,7 @@ pub fn local_sync() -> ( // local and object store sync at the start of the server pub async fn sync_start() -> anyhow::Result<()> { // Monitor local sync duration at startup - monitor_task_duration("startup_local_sync", Duration::from_secs(15), || async { + monitor_task_duration("startup_local_sync", LOCAL_SYNC_THRESHOLD, || async { let mut local_sync_joinset = JoinSet::new(); PARSEABLE .streams @@ -256,7 +258,7 @@ pub async fn sync_start() -> anyhow::Result<()> { // Monitor object store sync duration at startup monitor_task_duration( "startup_object_store_sync", - Duration::from_secs(15), + OBJECT_STORE_SYNC_THRESHOLD, || async { let mut object_store_joinset = JoinSet::new(); sync_all_streams(&mut object_store_joinset); From 5ae6366f749616499dfddcf70a6e715ae87174b4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Jun 2025 02:13:12 -0400 Subject: [PATCH 13/20] update caller for create stream --- src/connectors/kafka/processor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 09d826b46..5b01725ed 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -55,6 +55,7 @@ impl ParseableSinkProcessor { .create_stream_if_not_exists( stream_name, StreamType::UserDefined, + None, vec![log_source_entry], ) .await?; From 43926be8e54fbd7f91c2cd6d75cc0451f3d57ee3 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Jun 2025 06:45:48 -0400 Subject: [PATCH 14/20] sync thresholds in env, stats sync on shutdown, streaming response --- src/cli.rs | 17 +++ src/handlers/http/health_check.rs | 31 ++++- src/lib.rs | 6 - src/parseable/mod.rs | 5 + src/storage/object_storage.rs | 211 ++++++++++++++++++------------ src/sync.rs | 32 ++--- 6 files changed, 195 insertions(+), 107 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 3eef23fc1..8cbd6c6f7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -406,6 +406,23 @@ pub struct Options { help = "Enable/Disable collecting dataset stats" )] pub collect_dataset_stats: bool, + + // the duration during which local sync should be completed + #[arg( + long, + env = "P_LOCAL_SYNC_THRESHOLD", + default_value = "30", + help = "Local sync threshold in seconds" + )] + pub local_sync_threshold: u64, + // the duration during which object store sync should be completed + #[arg( + long, + env = "P_OBJECT_STORE_SYNC_THRESHOLD", + default_value = "60", + help = "Object store sync threshold in seconds" + )] + pub object_store_sync_threshold: u64, } #[derive(Parser, Debug)] diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index b4803e124..82825b415 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -56,12 +56,35 @@ pub async fn check_shutdown_middleware( // This function is called when the server is shutting down pub async fn shutdown() { - // Set the shutdown flag to true - let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; - *shutdown_flag = true; + // Set shutdown flag to true + set_shutdown_flag().await; //sleep for 5 secs to allow any ongoing requests to finish tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + // Perform sync operations + perform_sync_operations().await; + + // If collect_dataset_stats is enabled, perform sync operations + // This is to ensure that all stats data is synced before the server shuts down + if PARSEABLE.options.collect_dataset_stats { + perform_sync_operations().await; + } +} + +async fn set_shutdown_flag() { + let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; + *shutdown_flag = true; +} + +async fn perform_sync_operations() { + // Perform local sync + perform_local_sync().await; + // Perform object store sync + perform_object_store_sync().await; +} + +async fn perform_local_sync() { let mut local_sync_joinset = JoinSet::new(); // Sync staging @@ -76,7 +99,9 @@ pub async fn shutdown() { Err(err) => error!("Failed to join async task: {err}"), } } +} +async fn perform_object_store_sync() { // Sync object store let mut object_store_joinset = JoinSet::new(); sync_all_streams(&mut object_store_joinset); diff --git a/src/lib.rs b/src/lib.rs index 52ef6cc6f..8372a2e3b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,12 +72,6 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as /// Describes the duration at the end of which parquets are pushed into objectstore. pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30); -/// Describes the duration during which local sync should be completed -pub const LOCAL_SYNC_THRESHOLD: Duration = Duration::from_secs(30); // 30 secs - -/// Describes the duration during which object store sync should be completed -pub const OBJECT_STORE_SYNC_THRESHOLD: Duration = Duration::from_secs(15); // 15 secs - // A single HTTP client for all outgoing HTTP requests from the parseable server pub static HTTP_CLIENT: Lazy = Lazy::new(|| { ClientBuilder::new() diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 4b1c896cc..f2d70c1ed 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -362,6 +362,11 @@ impl Parseable { return Ok(true); } + // validate custom partition if provided + if let Some(partition) = custom_partition { + validate_custom_partition(partition)?; + } + // For distributed deployments, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 1832d5433..51e9fa43d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -830,16 +830,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let custom_partition = stream.get_custom_partition(); let schema = stream.get_schema(); for path in stream.parquet_files() { - if stream.get_stream_type() != StreamType::Internal - && PARSEABLE.options.collect_dataset_stats - { - if let Err(err) = calculate_field_stats(stream_name, &path, &schema).await { - warn!( - "Error calculating field stats for stream {}: {}", - stream_name, err - ); - } - } let filename = path .file_name() .expect("only parquet files are returned by iterator") @@ -889,6 +879,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path)?; catalog::update_snapshot(store, stream_name, manifest).await?; + // If the stream is not internal and stats collection is enabled, calculate field stats + // before removing the parquet file + if stream.get_stream_type() != StreamType::Internal + && PARSEABLE.options.collect_dataset_stats + { + if let Err(err) = calculate_field_stats(stream_name, &path, &schema).await { + warn!( + "Error calculating field stats for stream {}: {}", + stream_name, err + ); + } + } if let Err(e) = remove_file(path) { warn!("Failed to remove staged file: {e}"); } @@ -968,7 +970,13 @@ async fn calculate_field_stats( ) .await?; let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone()); - let ctx_table_name = format!("{}_{}", stream_name, parquet_path.display()); + let parquet_file_name = parquet_path + .file_name() + .expect("only parquet files are returned by iterator") + .to_str() + .expect("filename is valid string"); + let parquet_file_name = str::replace(parquet_file_name, ".", "_"); + let ctx_table_name = format!("{}_{}", stream_name, parquet_file_name); ctx.register_parquet( &ctx_table_name, parquet_path.to_str().expect("valid path"), @@ -1062,81 +1070,118 @@ async fn calculate_single_field_stats( /// This is used for fetching record count for a field and distinct count. async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { let df = ctx.sql(sql).await.ok()?; - let mut stream = df.execute_stream().await.ok()?; - let mut count = 0; - while let Some(batch_result) = stream.next().await { - let batch = batch_result.ok()?; - if batch.num_rows() == 0 { - return None; - } - let array = batch.column(0).as_any().downcast_ref::()?; - count += array.value(0); + let batches = df.collect().await.ok()?; + let batch = batches.first()?; + if batch.num_rows() == 0 { + return None; } - Some(count) + let array = batch.column(0).as_any().downcast_ref::()?; + + Some(array.value(0)) +} + +macro_rules! try_downcast { + ($ty:ty, $arr:expr, $body:expr) => { + if let Some(arr) = $arr.as_any().downcast_ref::<$ty>() { + $body(arr) + } else { + warn!( + "Expected {} for {:?}, but found {:?}", + stringify!($ty), + $arr.data_type(), + $arr.data_type() + ); + "UNSUPPORTED".to_string() + } + }; } -/// Helper function to format an Arrow value at a given index into a string. -/// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean. +/// Function to format an Arrow value at a given index into a string. +/// Handles null values and different data types by downcasting the array to the appropriate type. fn format_arrow_value(array: &dyn Array, idx: usize) -> String { if array.is_null(idx) { return "NULL".to_string(); } match array.data_type() { - DataType::Utf8 => array - .as_any() - .downcast_ref::() - .unwrap() + DataType::Utf8 => try_downcast!(StringArray, array, |arr: &StringArray| arr .value(idx) - .to_string(), - DataType::Utf8View => array - .as_any() - .downcast_ref::() - .unwrap() + .to_string()), + DataType::Utf8View => try_downcast!(StringViewArray, array, |arr: &StringViewArray| arr .value(idx) - .to_string(), - DataType::Binary => { - let arr = array.as_any().downcast_ref::().unwrap(); + .to_string()), + DataType::Binary => try_downcast!(BinaryArray, array, |arr: &BinaryArray| { String::from_utf8_lossy(arr.value(idx)).to_string() - } - DataType::BinaryView => { - let arr = array.as_any().downcast_ref::().unwrap(); + }), + DataType::BinaryView => try_downcast!(BinaryViewArray, array, |arr: &BinaryViewArray| { String::from_utf8_lossy(arr.value(idx)).to_string() - } - DataType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() + }), + DataType::Int64 => try_downcast!(Int64Array, array, |arr: &Int64Array| arr .value(idx) - .to_string(), - DataType::Float64 => array - .as_any() - .downcast_ref::() - .unwrap() + .to_string()), + DataType::Int32 => try_downcast!( + arrow_array::Int32Array, + array, + |arr: &arrow_array::Int32Array| arr.value(idx).to_string() + ), + DataType::Int16 => try_downcast!( + arrow_array::Int16Array, + array, + |arr: &arrow_array::Int16Array| arr.value(idx).to_string() + ), + DataType::Int8 => try_downcast!( + arrow_array::Int8Array, + array, + |arr: &arrow_array::Int8Array| arr.value(idx).to_string() + ), + DataType::UInt64 => try_downcast!( + arrow_array::UInt64Array, + array, + |arr: &arrow_array::UInt64Array| arr.value(idx).to_string() + ), + DataType::UInt32 => try_downcast!( + arrow_array::UInt32Array, + array, + |arr: &arrow_array::UInt32Array| arr.value(idx).to_string() + ), + DataType::UInt16 => try_downcast!( + arrow_array::UInt16Array, + array, + |arr: &arrow_array::UInt16Array| arr.value(idx).to_string() + ), + DataType::UInt8 => try_downcast!( + arrow_array::UInt8Array, + array, + |arr: &arrow_array::UInt8Array| arr.value(idx).to_string() + ), + DataType::Float64 => try_downcast!(Float64Array, array, |arr: &Float64Array| arr .value(idx) - .to_string(), - DataType::Timestamp(TimeUnit::Millisecond, _) => { - let arr = array - .as_any() - .downcast_ref::() - .unwrap(); - let timestamp = arr.value(idx); - DateTime::from_timestamp_millis(timestamp) - .map(|dt| dt.to_string()) - .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) - } - DataType::Date32 => array - .as_any() - .downcast_ref::() - .unwrap() + .to_string()), + DataType::Float32 => try_downcast!( + arrow_array::Float32Array, + array, + |arr: &arrow_array::Float32Array| arr.value(idx).to_string() + ), + DataType::Timestamp(TimeUnit::Millisecond, _) => try_downcast!( + TimestampMillisecondArray, + array, + |arr: &TimestampMillisecondArray| { + let timestamp = arr.value(idx); + chrono::DateTime::from_timestamp_millis(timestamp) + .map(|dt| dt.to_string()) + .unwrap_or_else(|| "INVALID_TIMESTAMP".to_string()) + } + ), + DataType::Date32 => try_downcast!(Date32Array, array, |arr: &Date32Array| arr .value(idx) - .to_string(), - DataType::Boolean => array - .as_any() - .downcast_ref::() - .unwrap() + .to_string()), + DataType::Boolean => try_downcast!(BooleanArray, array, |arr: &BooleanArray| if arr .value(idx) - .to_string(), + { + "true".to_string() + } else { + "false".to_string() + }), DataType::Null => "NULL".to_string(), _ => { warn!( @@ -1162,20 +1207,20 @@ async fn query_distinct_stats( ); let mut distinct_stats = Vec::new(); if let Ok(df) = ctx.sql(&sql).await { - if let Ok(batches) = df.collect().await { - for rb in batches { - let Some(counts) = rb.column(0).as_any().downcast_ref::() else { - warn!("Unexpected type for count column in stats query"); - continue; - }; - let values = rb.column(1).as_ref(); - for i in 0..rb.num_rows() { - let value = format_arrow_value(values, i); - distinct_stats.push(DistinctStat { - distinct_value: value, - count: counts.value(i), - }); - } + let mut stream = df.execute_stream().await.expect("Failed to execute stream"); + while let Some(batch_result) = stream.next().await { + let rb = batch_result.expect("Failed to execute stream"); + let Some(counts) = rb.column(0).as_any().downcast_ref::() else { + warn!("Unexpected type for count column in stats query"); + continue; + }; + let values = rb.column(1).as_ref(); + for i in 0..rb.num_rows() { + let value = format_arrow_value(values, i); + distinct_stats.push(DistinctStat { + distinct_value: value, + count: counts.value(i), + }); } } } diff --git a/src/sync.rs b/src/sync.rs index bad79abcf..c6ae55c9e 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -30,9 +30,7 @@ use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertTask}; use crate::parseable::PARSEABLE; use crate::storage::object_storage::sync_all_streams; -use crate::{ - LOCAL_SYNC_INTERVAL, LOCAL_SYNC_THRESHOLD, OBJECT_STORE_SYNC_THRESHOLD, STORAGE_UPLOAD_INTERVAL, -}; +use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; // Calculates the instant that is the start of the next minute fn next_minute() -> Instant { @@ -133,7 +131,7 @@ pub fn object_store_sync() -> ( // Monitor the duration of sync_all_streams execution monitor_task_duration( "object_store_sync_all_streams", - OBJECT_STORE_SYNC_THRESHOLD, + Duration::from_secs(PARSEABLE.options.object_store_sync_threshold), || async { let mut joinset = JoinSet::new(); sync_all_streams(&mut joinset); @@ -198,7 +196,7 @@ pub fn local_sync() -> ( // Monitor the duration of flush_and_convert execution monitor_task_duration( "local_sync_flush_and_convert", - LOCAL_SYNC_THRESHOLD, + Duration::from_secs(PARSEABLE.options.local_sync_threshold), || async { let mut joinset = JoinSet::new(); PARSEABLE.streams.flush_and_convert(&mut joinset, false, false); @@ -244,21 +242,25 @@ pub fn local_sync() -> ( // local and object store sync at the start of the server pub async fn sync_start() -> anyhow::Result<()> { // Monitor local sync duration at startup - monitor_task_duration("startup_local_sync", LOCAL_SYNC_THRESHOLD, || async { - let mut local_sync_joinset = JoinSet::new(); - PARSEABLE - .streams - .flush_and_convert(&mut local_sync_joinset, true, false); - while let Some(res) = local_sync_joinset.join_next().await { - log_join_result(res, "flush and convert"); - } - }) + monitor_task_duration( + "startup_local_sync", + Duration::from_secs(PARSEABLE.options.local_sync_threshold), + || async { + let mut local_sync_joinset = JoinSet::new(); + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, true, false); + while let Some(res) = local_sync_joinset.join_next().await { + log_join_result(res, "flush and convert"); + } + }, + ) .await; // Monitor object store sync duration at startup monitor_task_duration( "startup_object_store_sync", - OBJECT_STORE_SYNC_THRESHOLD, + Duration::from_secs(PARSEABLE.options.object_store_sync_threshold), || async { let mut object_store_joinset = JoinSet::new(); sync_all_streams(&mut object_store_joinset); From de65af8da20e633e7786ea46ee480b5dc892a60e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Jun 2025 07:17:10 -0400 Subject: [PATCH 15/20] add random suffix to table name --- src/storage/object_storage.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 51e9fa43d..2837b4ba2 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -975,8 +975,9 @@ async fn calculate_field_stats( .expect("only parquet files are returned by iterator") .to_str() .expect("filename is valid string"); + let random_suffix = Ulid::new().to_string(); let parquet_file_name = str::replace(parquet_file_name, ".", "_"); - let ctx_table_name = format!("{}_{}", stream_name, parquet_file_name); + let ctx_table_name = format!("{}_{}_{}", stream_name, parquet_file_name, random_suffix); ctx.register_parquet( &ctx_table_name, parquet_path.to_str().expect("valid path"), From 4fedec547afbd7d60f15e17aec9829c4ec499fd7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Jun 2025 08:33:37 -0400 Subject: [PATCH 16/20] refactor --- src/storage/object_storage.rs | 66 +++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 2837b4ba2..291fc298a 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -1041,22 +1041,26 @@ async fn calculate_single_field_stats( stream_name: &str, field_name: &str, ) -> Option { - let count = query_single_i64( - &ctx, - &format!("select count(\"{field_name}\") as count from \"{stream_name}\""), - ) - .await?; - if count == 0 { + // Use the escaped field name in the SQL query to avoid issues with special characters + let escaped_field_name = field_name.replace('"', "\"\""); + let escaped_stream_name = stream_name.replace('"', "\"\""); + let count_distinct_sql = format!( + "select COUNT(\"{escaped_field_name}\") as count, COUNT(DISTINCT \"{escaped_field_name}\") as distinct_count from \"{escaped_stream_name}\" "); + + let df = ctx.sql(&count_distinct_sql).await.ok()?; + let batches = df.collect().await.ok()?; + let batch = batches.first()?; + if batch.num_rows() == 0 { return None; } + let count_array = batch.column(0).as_any().downcast_ref::()?; + let distinct_count_array = batch.column(1).as_any().downcast_ref::()?; - let distinct_count = query_single_i64( - &ctx, - &format!( - "select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\"" - ), - ) - .await?; + let count = count_array.value(0); + let distinct_count = distinct_count_array.value(0); + if distinct_count == 0 { + return None; + } let distinct_stats = query_distinct_stats(&ctx, stream_name, field_name).await; Some(FieldStat { field_name: field_name.to_string(), @@ -1066,21 +1070,6 @@ async fn calculate_single_field_stats( }) } -/// Queries a single integer value from the DataFusion context. -/// Returns `None` if the query fails or returns no rows. -/// This is used for fetching record count for a field and distinct count. -async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option { - let df = ctx.sql(sql).await.ok()?; - let batches = df.collect().await.ok()?; - let batch = batches.first()?; - if batch.num_rows() == 0 { - return None; - } - let array = batch.column(0).as_any().downcast_ref::()?; - - Some(array.value(0)) -} - macro_rules! try_downcast { ($ty:ty, $arr:expr, $body:expr) => { if let Some(arr) = $arr.as_any().downcast_ref::<$ty>() { @@ -1202,15 +1191,30 @@ async fn query_distinct_stats( stream_name: &str, field_name: &str, ) -> Vec { + let escaped_field_name = field_name.replace('"', "\"\""); + let escaped_stream_name = stream_name.replace('"', "\"\""); + let sql = format!( - "select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}", + "select count(*) as distinct_count, \"{escaped_field_name}\" from \"{escaped_stream_name}\" group by \"{escaped_field_name}\" order by distinct_count desc limit {}", PARSEABLE.options.max_field_statistics ); let mut distinct_stats = Vec::new(); if let Ok(df) = ctx.sql(&sql).await { - let mut stream = df.execute_stream().await.expect("Failed to execute stream"); + let mut stream = match df.execute_stream().await { + Ok(stream) => stream, + Err(e) => { + warn!("Failed to execute distinct stats query: {e}"); + return distinct_stats; // Return empty if query fails + } + }; while let Some(batch_result) = stream.next().await { - let rb = batch_result.expect("Failed to execute stream"); + let rb = match batch_result { + Ok(batch) => batch, + Err(e) => { + warn!("Failed to fetch batch in distinct stats query: {e}"); + continue; // Skip this batch if there's an error + } + }; let Some(counts) = rb.column(0).as_any().downcast_ref::() else { warn!("Unexpected type for count column in stats query"); continue; From bb3a67037e866df273ec9e59d8c4b240ac0124b8 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Jun 2025 15:11:27 -0400 Subject: [PATCH 17/20] optimised single query, stats conversion --- src/storage/object_storage.rs | 165 +++++++++++++++++++--------------- 1 file changed, 93 insertions(+), 72 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 291fc298a..45525e19a 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -889,6 +889,16 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { "Error calculating field stats for stream {}: {}", stream_name, err ); + } else { + let stats_stream = PARSEABLE + .get_stream(DATASET_STATS_STREAM_NAME) + .expect("Dataset stats stream should exist"); + if let Err(err) = stats_stream.flush_and_convert(false, false).await { + warn!( + "Error flushing dataset stats stream {}: {}", + DATASET_STATS_STREAM_NAME, err + ); + } } } if let Err(e) = remove_file(path) { @@ -965,7 +975,7 @@ async fn calculate_field_stats( .create_stream_if_not_exists( DATASET_STATS_STREAM_NAME, StreamType::Internal, - Some(&"dataset_name".to_string()), + Some(&"dataset_name".into()), vec![log_source_entry], ) .await?; @@ -1034,42 +1044,102 @@ async fn collect_all_field_stats( .await } -/// Calculates statistics for a single field in the stream. -/// Returns `None` if the count query returns 0. +/// This function is used to fetch distinct values and their counts for a field in the stream. +/// Returns a vector of `DistinctStat` containing distinct values and their counts. +/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`. async fn calculate_single_field_stats( ctx: SessionContext, stream_name: &str, field_name: &str, ) -> Option { - // Use the escaped field name in the SQL query to avoid issues with special characters - let escaped_field_name = field_name.replace('"', "\"\""); - let escaped_stream_name = stream_name.replace('"', "\"\""); - let count_distinct_sql = format!( - "select COUNT(\"{escaped_field_name}\") as count, COUNT(DISTINCT \"{escaped_field_name}\") as distinct_count from \"{escaped_stream_name}\" "); - - let df = ctx.sql(&count_distinct_sql).await.ok()?; - let batches = df.collect().await.ok()?; - let batch = batches.first()?; - if batch.num_rows() == 0 { - return None; - } - let count_array = batch.column(0).as_any().downcast_ref::()?; - let distinct_count_array = batch.column(1).as_any().downcast_ref::()?; + let mut total_count = 0; + let mut distinct_count = 0; + let mut distinct_stats = Vec::new(); - let count = count_array.value(0); - let distinct_count = distinct_count_array.value(0); - if distinct_count == 0 { - return None; + let combined_sql = get_stats_sql(stream_name, field_name); + if let Ok(df) = ctx.sql(&combined_sql).await { + let mut stream = match df.execute_stream().await { + Ok(stream) => stream, + Err(e) => { + warn!("Failed to execute distinct stats query: {e}"); + return None; // Return empty if query fails + } + }; + while let Some(batch_result) = stream.next().await { + let rb = match batch_result { + Ok(batch) => batch, + Err(e) => { + warn!("Failed to fetch batch in distinct stats query: {e}"); + continue; // Skip this batch if there's an error + } + }; + let total_count_array = rb.column(0).as_any().downcast_ref::()?; + let distinct_count_array = rb.column(1).as_any().downcast_ref::()?; + + total_count = total_count_array.value(0); + distinct_count = distinct_count_array.value(0); + + if distinct_count == 0 { + return None; + } + + let field_value_array = rb.column(2).as_ref(); + let value_count_array = rb.column(3).as_any().downcast_ref::()?; + + for i in 0..rb.num_rows() { + let value = format_arrow_value(field_value_array, i); + let count = value_count_array.value(i); + + distinct_stats.push(DistinctStat { + distinct_value: value, + count, + }); + } + } } - let distinct_stats = query_distinct_stats(&ctx, stream_name, field_name).await; Some(FieldStat { field_name: field_name.to_string(), - count, + count: total_count, distinct_count, distinct_stats, }) } +fn get_stats_sql(stream_name: &str, field_name: &str) -> String { + let escaped_field_name = field_name.replace('"', "\"\""); + let escaped_stream_name = stream_name.replace('"', "\"\""); + + format!( + r#" + WITH field_groups AS ( + SELECT + "{escaped_field_name}" as field_value, + COUNT(*) as value_count + FROM "{escaped_stream_name}" + GROUP BY "{escaped_field_name}" + ), + field_summary AS ( + SELECT + field_value, + value_count, + SUM(value_count) OVER () as total_count, + COUNT(*) OVER () as distinct_count, + ROW_NUMBER() OVER (ORDER BY value_count DESC) as rn + FROM field_groups + ) + SELECT + total_count, + distinct_count, + field_value, + value_count + FROM field_summary + WHERE rn <= {} + ORDER BY value_count DESC + "#, + PARSEABLE.options.max_field_statistics + ) +} + macro_rules! try_downcast { ($ty:ty, $arr:expr, $body:expr) => { if let Some(arr) = $arr.as_any().downcast_ref::<$ty>() { @@ -1183,55 +1253,6 @@ fn format_arrow_value(array: &dyn Array, idx: usize) -> String { } } -/// This function is used to fetch distinct values and their counts for a field in the stream. -/// Returns a vector of `DistinctStat` containing distinct values and their counts. -/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`. -async fn query_distinct_stats( - ctx: &SessionContext, - stream_name: &str, - field_name: &str, -) -> Vec { - let escaped_field_name = field_name.replace('"', "\"\""); - let escaped_stream_name = stream_name.replace('"', "\"\""); - - let sql = format!( - "select count(*) as distinct_count, \"{escaped_field_name}\" from \"{escaped_stream_name}\" group by \"{escaped_field_name}\" order by distinct_count desc limit {}", - PARSEABLE.options.max_field_statistics - ); - let mut distinct_stats = Vec::new(); - if let Ok(df) = ctx.sql(&sql).await { - let mut stream = match df.execute_stream().await { - Ok(stream) => stream, - Err(e) => { - warn!("Failed to execute distinct stats query: {e}"); - return distinct_stats; // Return empty if query fails - } - }; - while let Some(batch_result) = stream.next().await { - let rb = match batch_result { - Ok(batch) => batch, - Err(e) => { - warn!("Failed to fetch batch in distinct stats query: {e}"); - continue; // Skip this batch if there's an error - } - }; - let Some(counts) = rb.column(0).as_any().downcast_ref::() else { - warn!("Unexpected type for count column in stats query"); - continue; - }; - let values = rb.column(1).as_ref(); - for i in 0..rb.num_rows() { - let value = format_arrow_value(values, i); - distinct_stats.push(DistinctStat { - distinct_value: value, - count: counts.value(i), - }); - } - } - } - distinct_stats -} - pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema, From 1ff2c56826104cee99355787dd2bd97f3f2eeb08 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 19 Jun 2025 03:42:25 -0400 Subject: [PATCH 18/20] add tests, non blocking task for stats stream local sync --- src/cli.rs | 2 +- src/storage/object_storage.rs | 712 +++++++++++++++++++++++++++++----- 2 files changed, 624 insertions(+), 90 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 8cbd6c6f7..52b6a05ca 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -419,7 +419,7 @@ pub struct Options { #[arg( long, env = "P_OBJECT_STORE_SYNC_THRESHOLD", - default_value = "60", + default_value = "15", help = "Object store sync threshold in seconds" )] pub object_store_sync_threshold: u64, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 45525e19a..a22322c85 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -16,17 +16,6 @@ * */ -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::collections::HashSet; -use std::fmt::Debug; -use std::fs::{remove_file, File}; -use std::num::NonZeroU32; -use std::path::Path; -use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; - use actix_web_prometheus::PrometheusMetrics; use arrow_array::Array; use arrow_array::BinaryArray; @@ -54,6 +43,17 @@ use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde::Serialize; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::HashSet; +use std::fmt::Debug; +use std::fs::{remove_file, File}; +use std::num::NonZeroU32; +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use tokio::task; use tokio::task::JoinSet; use tracing::info; use tracing::{error, warn}; @@ -825,7 +825,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { return Ok(()); } info!("Starting object_store_sync for stream- {stream_name}"); - + let mut stats_calculated = false; let stream = PARSEABLE.get_or_create_stream(stream_name); let custom_partition = stream.get_custom_partition(); let schema = stream.get_schema(); @@ -849,7 +849,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let stream_relative_path = format!("{stream_name}/{file_suffix}"); // Try uploading the file, handle potential errors without breaking the loop - // if let Err(e) = self.upload_multipart(key, path) if let Err(e) = self .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) .await @@ -879,26 +878,17 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path)?; catalog::update_snapshot(store, stream_name, manifest).await?; - // If the stream is not internal and stats collection is enabled, calculate field stats - // before removing the parquet file - if stream.get_stream_type() != StreamType::Internal - && PARSEABLE.options.collect_dataset_stats - { - if let Err(err) = calculate_field_stats(stream_name, &path, &schema).await { - warn!( + // If stats collection is enabled, calculate field stats + if stream_name != DATASET_STATS_STREAM_NAME && PARSEABLE.options.collect_dataset_stats { + let max_field_statistics = PARSEABLE.options.max_field_statistics; + match calculate_field_stats(stream_name, &path, &schema, max_field_statistics).await + { + Ok(stats) if stats => stats_calculated = true, + Err(err) => warn!( "Error calculating field stats for stream {}: {}", stream_name, err - ); - } else { - let stats_stream = PARSEABLE - .get_stream(DATASET_STATS_STREAM_NAME) - .expect("Dataset stats stream should exist"); - if let Err(err) = stats_stream.flush_and_convert(false, false).await { - warn!( - "Error flushing dataset stats stream {}: {}", - DATASET_STATS_STREAM_NAME, err - ); - } + ), + _ => {} } } if let Err(e) = remove_file(path) { @@ -915,6 +905,17 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } } + if stats_calculated { + // perform local sync for the `pstats` dataset + task::spawn(async move { + if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) { + if let Err(err) = stats_stream.flush_and_convert(false, false).await { + error!("Failed in local sync for dataset stats stream: {err}"); + } + } + }); + } + Ok(()) } } @@ -969,45 +970,41 @@ async fn calculate_field_stats( stream_name: &str, parquet_path: &Path, schema: &Schema, -) -> Result<(), PostError> { - let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); - PARSEABLE - .create_stream_if_not_exists( - DATASET_STATS_STREAM_NAME, - StreamType::Internal, - Some(&"dataset_name".into()), - vec![log_source_entry], - ) - .await?; + max_field_statistics: usize, +) -> Result { let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone()); - let parquet_file_name = parquet_path - .file_name() - .expect("only parquet files are returned by iterator") - .to_str() - .expect("filename is valid string"); - let random_suffix = Ulid::new().to_string(); - let parquet_file_name = str::replace(parquet_file_name, ".", "_"); - let ctx_table_name = format!("{}_{}_{}", stream_name, parquet_file_name, random_suffix); + + let table_name = Ulid::new().to_string(); ctx.register_parquet( - &ctx_table_name, + &table_name, parquet_path.to_str().expect("valid path"), ParquetReadOptions::default(), ) .await .map_err(|e| PostError::Invalid(e.into()))?; - let field_stats = collect_all_field_stats(&ctx_table_name, &ctx, schema).await; + let field_stats = + collect_all_field_stats(&table_name, &ctx, schema, max_field_statistics).await; drop(ctx); - + let mut stats_calculated = false; let stats = DatasetStats { dataset_name: stream_name.to_string(), field_stats, }; if stats.field_stats.is_empty() { - return Ok(()); + return Ok(stats_calculated); } + stats_calculated = true; let stats_value = serde_json::to_value(&stats).map_err(|e| ObjectStorageError::Invalid(e.into()))?; - + let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new()); + PARSEABLE + .create_stream_if_not_exists( + DATASET_STATS_STREAM_NAME, + StreamType::Internal, + Some(&"dataset_name".into()), + vec![log_source_entry], + ) + .await?; flatten_and_push_logs( stats_value, DATASET_STATS_STREAM_NAME, @@ -1015,7 +1012,7 @@ async fn calculate_field_stats( &HashMap::new(), ) .await?; - Ok(()) + Ok(stats_calculated) } /// Collects statistics for all fields in the stream. @@ -1025,6 +1022,7 @@ async fn collect_all_field_stats( stream_name: &str, ctx: &SessionContext, schema: &Schema, + max_field_statistics: usize, ) -> Vec { // Collect field names into an owned Vec to avoid lifetime issues let field_names: Vec = schema @@ -1034,7 +1032,9 @@ async fn collect_all_field_stats( .collect(); let field_futures = field_names.into_iter().map(|field_name| { let ctx = ctx.clone(); - async move { calculate_single_field_stats(ctx, stream_name, &field_name).await } + async move { + calculate_single_field_stats(ctx, stream_name, &field_name, max_field_statistics).await + } }); futures::stream::iter(field_futures) @@ -1051,51 +1051,57 @@ async fn calculate_single_field_stats( ctx: SessionContext, stream_name: &str, field_name: &str, + max_field_statistics: usize, ) -> Option { let mut total_count = 0; let mut distinct_count = 0; let mut distinct_stats = Vec::new(); - let combined_sql = get_stats_sql(stream_name, field_name); - if let Ok(df) = ctx.sql(&combined_sql).await { - let mut stream = match df.execute_stream().await { - Ok(stream) => stream, - Err(e) => { - warn!("Failed to execute distinct stats query: {e}"); - return None; // Return empty if query fails - } - }; - while let Some(batch_result) = stream.next().await { - let rb = match batch_result { - Ok(batch) => batch, + let combined_sql = get_stats_sql(stream_name, field_name, max_field_statistics); + match ctx.sql(&combined_sql).await { + Ok(df) => { + let mut stream = match df.execute_stream().await { + Ok(stream) => stream, Err(e) => { - warn!("Failed to fetch batch in distinct stats query: {e}"); - continue; // Skip this batch if there's an error + warn!("Failed to execute distinct stats query: {e}"); + return None; // Return empty if query fails } }; - let total_count_array = rb.column(0).as_any().downcast_ref::()?; - let distinct_count_array = rb.column(1).as_any().downcast_ref::()?; - - total_count = total_count_array.value(0); - distinct_count = distinct_count_array.value(0); + while let Some(batch_result) = stream.next().await { + let rb = match batch_result { + Ok(batch) => batch, + Err(e) => { + warn!("Failed to fetch batch in distinct stats query: {e}"); + continue; // Skip this batch if there's an error + } + }; + let total_count_array = rb.column(0).as_any().downcast_ref::()?; + let distinct_count_array = rb.column(1).as_any().downcast_ref::()?; - if distinct_count == 0 { - return None; - } + total_count = total_count_array.value(0); + distinct_count = distinct_count_array.value(0); + if distinct_count == 0 { + return None; + } - let field_value_array = rb.column(2).as_ref(); - let value_count_array = rb.column(3).as_any().downcast_ref::()?; + let field_value_array = rb.column(2).as_ref(); + let value_count_array = rb.column(3).as_any().downcast_ref::()?; - for i in 0..rb.num_rows() { - let value = format_arrow_value(field_value_array, i); - let count = value_count_array.value(i); + for i in 0..rb.num_rows() { + let value = format_arrow_value(field_value_array, i); + let count = value_count_array.value(i); - distinct_stats.push(DistinctStat { - distinct_value: value, - count, - }); + distinct_stats.push(DistinctStat { + distinct_value: value, + count, + }); + } } } + Err(e) => { + warn!("Failed to execute distinct stats query for field: {field_name}, error: {e}"); + return None; + } } Some(FieldStat { field_name: field_name.to_string(), @@ -1105,7 +1111,7 @@ async fn calculate_single_field_stats( }) } -fn get_stats_sql(stream_name: &str, field_name: &str) -> String { +fn get_stats_sql(stream_name: &str, field_name: &str, max_field_statistics: usize) -> String { let escaped_field_name = field_name.replace('"', "\"\""); let escaped_stream_name = stream_name.replace('"', "\"\""); @@ -1136,7 +1142,7 @@ fn get_stats_sql(stream_name: &str, field_name: &str) -> String { WHERE rn <= {} ORDER BY value_count DESC "#, - PARSEABLE.options.max_field_statistics + max_field_statistics ) } @@ -1346,3 +1352,531 @@ pub fn manifest_path(prefix: &str) -> RelativePathBuf { _ => RelativePathBuf::from_iter([prefix, MANIFEST_FILE]), } } +#[cfg(test)] +mod tests { + use std::{fs::OpenOptions, sync::Arc}; + + use arrow_array::{ + BooleanArray, Float64Array, Int64Array, RecordBatch, StringArray, TimestampMillisecondArray, + }; + use arrow_schema::{DataType, Field, Schema, TimeUnit}; + use datafusion::prelude::{ParquetReadOptions, SessionContext}; + use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; + use temp_dir::TempDir; + use ulid::Ulid; + + use crate::storage::object_storage::calculate_single_field_stats; + + async fn create_test_parquet_with_data() -> (TempDir, std::path::PathBuf) { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test_data.parquet"); + let schema = Arc::new(create_test_schema()); + + // Create test data with various patterns + let id_array = Int64Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let name_array = StringArray::from(vec![ + Some("Alice"), + Some("Bob"), + Some("Alice"), + Some("Charlie"), + Some("Alice"), + Some("Bob"), + Some("David"), + None, + Some("Eve"), + Some("Frank"), + ]); + let score_array = Float64Array::from(vec![ + Some(95.5), + Some(87.2), + Some(95.5), + Some(78.9), + Some(92.1), + Some(88.8), + Some(91.0), + None, + Some(89.5), + Some(94.2), + ]); + let active_array = BooleanArray::from(vec![ + Some(true), + Some(false), + Some(true), + Some(true), + Some(true), + Some(false), + Some(true), + None, // null value + Some(false), + Some(true), + ]); + let timestamp_array = TimestampMillisecondArray::from(vec![ + Some(1640995200000), + Some(1640995260000), + Some(1640995200000), + Some(1640995320000), + Some(1640995380000), + Some(1640995440000), + Some(1640995500000), + None, + Some(1640995560000), + Some(1640995620000), + ]); + // Field with single value (all same) + let single_value_array = StringArray::from(vec![ + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + Some("constant"), + ]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(id_array), + Arc::new(name_array), + Arc::new(score_array), + Arc::new(active_array), + Arc::new(timestamp_array), + Arc::new(single_value_array), + ], + ) + .unwrap(); + let props = WriterProperties::new(); + let mut parquet_file = OpenOptions::new() + .create(true) + .append(true) + .open(file_path.clone()) + .unwrap(); + let mut writer = + ArrowWriter::try_new(&mut parquet_file, schema.clone(), Some(props.clone())).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + (temp_dir, file_path) + } + + fn create_test_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("score", DataType::Float64, true), + Field::new("active", DataType::Boolean, true), + Field::new( + "created_at", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("single_value", DataType::Utf8, true), + ]) + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_multiple_values() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let random_suffix = Ulid::new().to_string(); + let ctx = SessionContext::new(); + ctx.register_parquet( + &random_suffix, + parquet_path.to_str().expect("valid path"), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test name field with multiple distinct values and different frequencies + let result = calculate_single_field_stats(ctx.clone(), &random_suffix, "name", 50).await; + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "name"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 7); + assert_eq!(stats.distinct_stats.len(), 7); + + // Verify ordering by count (descending) + assert!(stats.distinct_stats[0].count >= stats.distinct_stats[1].count); + assert!(stats.distinct_stats[1].count >= stats.distinct_stats[2].count); + + // Verify specific counts + let alice_stat = stats + .distinct_stats + .iter() + .find(|s| s.distinct_value == "Alice"); + assert!(alice_stat.is_some()); + assert_eq!(alice_stat.unwrap().count, 3); + + let bob_stat = stats + .distinct_stats + .iter() + .find(|s| s.distinct_value == "Bob"); + assert!(bob_stat.is_some()); + assert_eq!(bob_stat.unwrap().count, 2); + + let charlie_stat = stats + .distinct_stats + .iter() + .find(|s| s.distinct_value == "Charlie"); + assert!(charlie_stat.is_some()); + assert_eq!(charlie_stat.unwrap().count, 1); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_numeric_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test score field (Float64) + let result = calculate_single_field_stats(ctx.clone(), &table_name, "score", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "score"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 9); + + // Verify that 95.5 appears twice (should be first due to highest count) + let highest_count_stat = &stats.distinct_stats[0]; + assert_eq!(highest_count_stat.distinct_value, "95.5"); + assert_eq!(highest_count_stat.count, 2); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_boolean_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test active field (Boolean) + let result = calculate_single_field_stats(ctx.clone(), &table_name, "active", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "active"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 3); + assert_eq!(stats.distinct_stats.len(), 3); + + assert_eq!(stats.distinct_stats[0].distinct_value, "true"); + assert_eq!(stats.distinct_stats[0].count, 6); + assert_eq!(stats.distinct_stats[1].distinct_value, "false"); + assert_eq!(stats.distinct_stats[1].count, 3); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_timestamp_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test created_at field (Timestamp) + let result = calculate_single_field_stats(ctx.clone(), &table_name, "created_at", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "created_at"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 9); + + // Verify that the duplicate timestamp appears twice + let duplicate_timestamp = &stats.distinct_stats[0]; + assert_eq!(duplicate_timestamp.count, 2); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_single_value_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test field with single distinct value + let result = + calculate_single_field_stats(ctx.clone(), &table_name, "single_value", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.field_name, "single_value"); + assert_eq!(stats.count, 10); + assert_eq!(stats.distinct_count, 1); + assert_eq!(stats.distinct_stats.len(), 1); + assert_eq!(stats.distinct_stats[0].distinct_value, "constant"); + assert_eq!(stats.distinct_stats[0].count, 10); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_nonexistent_table() { + let ctx = SessionContext::new(); + + // Test with non-existent table + let result = + calculate_single_field_stats(ctx.clone(), "non_existent_table", "field", 50).await; + + // Should return None due to SQL execution failure + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_nonexistent_field() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test with non-existent field + let result = + calculate_single_field_stats(ctx.clone(), &table_name, "non_existent_field", 50).await; + + // Should return None due to SQL execution failure + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_with_special_characters() { + // Create a schema with field names containing special characters + let schema = Arc::new(Schema::new(vec![ + Field::new("field with spaces", DataType::Utf8, true), + Field::new("field\"with\"quotes", DataType::Utf8, true), + Field::new("field'with'apostrophes", DataType::Utf8, true), + ])); + + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("special_chars.parquet"); + + use parquet::arrow::AsyncArrowWriter; + use tokio::fs::File; + + let space_array = StringArray::from(vec![Some("value1"), Some("value2"), Some("value1")]); + let quote_array = StringArray::from(vec![Some("quote1"), Some("quote2"), Some("quote1")]); + let apostrophe_array = StringArray::from(vec![Some("apos1"), Some("apos2"), Some("apos1")]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(space_array), + Arc::new(quote_array), + Arc::new(apostrophe_array), + ], + ) + .unwrap(); + + let file = File::create(&file_path).await.unwrap(); + let mut writer = AsyncArrowWriter::try_new(file, schema, None).unwrap(); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test field with spaces + let result = + calculate_single_field_stats(ctx.clone(), &table_name, "field with spaces", 50).await; + assert!(result.is_some()); + let stats = result.unwrap(); + assert_eq!(stats.field_name, "field with spaces"); + assert_eq!(stats.count, 3); + assert_eq!(stats.distinct_count, 2); + + // Test field with quotes + let result = + calculate_single_field_stats(ctx.clone(), &table_name, "field\"with\"quotes", 50).await; + assert!(result.is_some()); + let stats = result.unwrap(); + assert_eq!(stats.field_name, "field\"with\"quotes"); + assert_eq!(stats.count, 3); + assert_eq!(stats.distinct_count, 2); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_empty_table() { + // Create empty table + let schema = Arc::new(create_test_schema()); + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("empty_data.parquet"); + + use parquet::arrow::AsyncArrowWriter; + use tokio::fs::File; + + let file = File::create(&file_path).await.unwrap(); + let mut writer = AsyncArrowWriter::try_new(file, schema.clone(), None).unwrap(); + + // Create empty batch + let empty_batch = RecordBatch::new_empty(schema.clone()); + writer.write(&empty_batch).await.unwrap(); + writer.close().await.unwrap(); + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let result = calculate_single_field_stats(ctx.clone(), &table_name, "name", 50).await; + assert!(result.unwrap().distinct_stats.is_empty()); + } + + #[tokio::test] + async fn test_calculate_single_field_stats_streaming_behavior() { + let (_temp_dir, parquet_path) = create_test_parquet_with_data().await; + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Test that the function handles streaming properly by checking + // that all data is collected correctly across multiple batches + let result = calculate_single_field_stats(ctx.clone(), &table_name, "name", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + // Verify that the streaming collected all the data + let total_distinct_count: i64 = stats.distinct_stats.iter().map(|s| s.count).sum(); + assert_eq!(total_distinct_count, stats.count); + + // Verify that distinct_stats are properly ordered by count + for i in 1..stats.distinct_stats.len() { + assert!(stats.distinct_stats[i - 1].count >= stats.distinct_stats[i].count); + } + } + + #[tokio::test] + async fn test_calculate_single_field_stats_large_dataset() { + // Create a larger dataset to test streaming behavior + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("category", DataType::Utf8, true), + ])); + + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("large_data.parquet"); + + use parquet::arrow::AsyncArrowWriter; + use tokio::fs::File; + + // Create 1000 rows with 10 distinct categories + let ids: Vec = (0..1000).collect(); + let categories: Vec> = (0..1000) + .map(|i| { + Some(match i % 10 { + 0 => "cat_0", + 1 => "cat_1", + 2 => "cat_2", + 3 => "cat_3", + 4 => "cat_4", + 5 => "cat_5", + 6 => "cat_6", + 7 => "cat_7", + 8 => "cat_8", + _ => "cat_9", + }) + }) + .collect(); + + let id_array = Int64Array::from(ids); + let category_array = StringArray::from(categories); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(id_array), Arc::new(category_array)], + ) + .unwrap(); + + let file = File::create(&file_path).await.unwrap(); + let mut writer = AsyncArrowWriter::try_new(file, schema, None).unwrap(); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); + + let ctx = SessionContext::new(); + let table_name = ulid::Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let result = calculate_single_field_stats(ctx.clone(), &table_name, "category", 50).await; + + assert!(result.is_some()); + let stats = result.unwrap(); + + assert_eq!(stats.count, 1000); + assert_eq!(stats.distinct_count, 10); + assert_eq!(stats.distinct_stats.len(), 10); + + // Each category should appear 100 times + for distinct_stat in &stats.distinct_stats { + assert_eq!(distinct_stat.count, 100); + } + } +} From 829c97deab4378bd0ae8934ae1941d7dffff75cf Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 19 Jun 2025 04:42:50 -0400 Subject: [PATCH 19/20] refactor --- src/storage/object_storage.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index a22322c85..df6bf77f0 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -972,19 +972,19 @@ async fn calculate_field_stats( schema: &Schema, max_field_statistics: usize, ) -> Result { - let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone()); + let field_stats = { + let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone()); + let table_name = Ulid::new().to_string(); + ctx.register_parquet( + &table_name, + parquet_path.to_str().expect("valid path"), + ParquetReadOptions::default(), + ) + .await + .map_err(|e| PostError::Invalid(e.into()))?; - let table_name = Ulid::new().to_string(); - ctx.register_parquet( - &table_name, - parquet_path.to_str().expect("valid path"), - ParquetReadOptions::default(), - ) - .await - .map_err(|e| PostError::Invalid(e.into()))?; - let field_stats = - collect_all_field_stats(&table_name, &ctx, schema, max_field_statistics).await; - drop(ctx); + collect_all_field_stats(&table_name, &ctx, schema, max_field_statistics).await + }; let mut stats_calculated = false; let stats = DatasetStats { dataset_name: stream_name.to_string(), From 478b282661499f7bafbb3b55b9026146c6897883 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 19 Jun 2025 05:04:54 -0400 Subject: [PATCH 20/20] remove stats calculation from conversion --- src/parseable/streams.rs | 15 ++++----------- src/storage/object_storage.rs | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index e9b9fbef9..28cceefcd 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -463,7 +463,7 @@ impl Stream { } /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` - pub async fn prepare_parquet( + pub fn prepare_parquet( &self, init_signal: bool, shutdown_signal: bool, @@ -637,8 +637,6 @@ impl Stream { } self.update_staging_metrics(&staging_files); - - let mut record_batches = Vec::new(); for (parquet_path, arrow_files) in staging_files { let record_reader = MergedReverseRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { @@ -656,7 +654,6 @@ impl Stream { &schema, &props, time_partition, - &mut record_batches, )? { continue; } @@ -682,7 +679,6 @@ impl Stream { schema: &Arc, props: &WriterProperties, time_partition: Option<&String>, - record_batches: &mut Vec, ) -> Result { let mut part_file = OpenOptions::new() .create(true) @@ -692,8 +688,6 @@ impl Stream { let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?; for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { writer.write(record)?; - // Collect record batches for finding statistics later - record_batches.push(record.clone()); } writer.close()?; @@ -960,7 +954,7 @@ impl Stream { } /// First flushes arrows onto disk and then converts the arrow into parquet - pub async fn flush_and_convert( + pub fn flush_and_convert( &self, init_signal: bool, shutdown_signal: bool, @@ -975,7 +969,7 @@ impl Stream { let start_convert = Instant::now(); - self.prepare_parquet(init_signal, shutdown_signal).await?; + self.prepare_parquet(init_signal, shutdown_signal)?; trace!( "Converting arrows to parquet on stream ({}) took: {}s", self.stream_name, @@ -1070,8 +1064,7 @@ impl Streams { .map(Arc::clone) .collect(); for stream in streams { - joinset - .spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal).await }); + joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal) }); } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index df6bf77f0..c4a2ab5d2 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -909,7 +909,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { // perform local sync for the `pstats` dataset task::spawn(async move { if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) { - if let Err(err) = stats_stream.flush_and_convert(false, false).await { + if let Err(err) = stats_stream.flush_and_convert(false, false) { error!("Failed in local sync for dataset stats stream: {err}"); } }