Skip to content

feat: add stats for each field #1340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,24 @@ pub struct Options {
help = "Maximum level of flattening allowed for events"
)]
pub event_flatten_level: usize,

// maximum limit to store the statistics for a field
#[arg(
long,
env = "P_MAX_FIELD_STATISTICS",
default_value = "50",
help = "Maximum number of field statistics to store"
)]
pub max_field_statistics: usize,

// collect dataset stats
#[arg(
long,
env = "P_COLLECT_DATASET_STATS",
default_value = "false",
help = "Enable/Disable collecting dataset stats"
)]
pub collect_dataset_stats: bool,
}

#[derive(Parser, Debug)]
Expand Down
1 change: 1 addition & 0 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl ParseableSinkProcessor {
.create_stream_if_not_exists(
stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry],
)
.await?;
Expand Down
4 changes: 4 additions & 0 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub async fn ingest(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
)
.await?;
Expand Down Expand Up @@ -183,6 +184,7 @@ pub async fn handle_otel_logs_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
)
.await?;
Expand Down Expand Up @@ -248,6 +250,7 @@ pub async fn handle_otel_metrics_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
)
.await?;
Expand Down Expand Up @@ -313,6 +316,7 @@ pub async fn handle_otel_traces_ingestion(
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
)
.await?;
Expand Down
13 changes: 9 additions & 4 deletions src/handlers/http/prism_home.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
};

const HOME_SEARCH_QUERY_PARAM: &str = "key";

pub const HOME_QUERY_PARAM: &str = "includeInternal";
/// Fetches the data to populate Prism's home
///
///
Expand All @@ -36,8 +36,12 @@ const HOME_SEARCH_QUERY_PARAM: &str = "key";
pub async fn home_api(req: HttpRequest) -> Result<impl Responder, PrismHomeError> {
let key = extract_session_key_from_req(&req)
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
.map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?;

let res = generate_home_response(&key).await?;
let include_internal = query_map.get(HOME_QUERY_PARAM).is_some_and(|v| v == "true");

let res = generate_home_response(&key, include_internal).await?;

Ok(web::Json(res))
}
Expand All @@ -52,11 +56,12 @@ pub async fn home_search(req: HttpRequest) -> Result<impl Responder, PrismHomeEr
return Ok(web::Json(serde_json::json!({})));
}

let query_value = query_map
let query_key = query_map
.get(HOME_SEARCH_QUERY_PARAM)
.ok_or_else(|| PrismHomeError::InvalidQueryParameter(HOME_SEARCH_QUERY_PARAM.to_string()))?
.to_lowercase();
let res = generate_home_search_response(&key, &query_value).await?;

let res = generate_home_search_response(&key, &query_key).await?;
let json_res = serde_json::to_value(res)
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;

Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as
/// Describes the duration at the end of which parquets are pushed into objectstore.
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);

/// Describes the duration during which local sync should be completed
pub const LOCAL_SYNC_THRESHOLD: Duration = Duration::from_secs(30); // 30 secs

/// Describes the duration during which object store sync should be completed
pub const OBJECT_STORE_SYNC_THRESHOLD: Duration = Duration::from_secs(15); // 15 secs

// A single HTTP client for all outgoing HTTP requests from the parseable server
pub static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
ClientBuilder::new()
Expand Down
4 changes: 3 additions & 1 deletion src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ impl Parseable {
.create_stream_if_not_exists(
INTERNAL_STREAM_NAME,
StreamType::Internal,
None,
vec![log_source_entry],
)
.await
Expand All @@ -354,6 +355,7 @@ impl Parseable {
&self,
stream_name: &str,
stream_type: StreamType,
custom_partition: Option<&String>,
log_source: Vec<LogSourceEntry>,
) -> Result<bool, PostError> {
if self.streams.contains(stream_name) {
Expand All @@ -375,7 +377,7 @@ impl Parseable {
stream_name.to_string(),
"",
None,
None,
custom_partition,
false,
Arc::new(Schema::empty()),
stream_type,
Expand Down
32 changes: 18 additions & 14 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
use derive_more::{Deref, DerefMut};
use derive_more::derive::{Deref, DerefMut};
use itertools::Itertools;
use parquet::{
arrow::ArrowWriter,
Expand Down Expand Up @@ -463,7 +463,7 @@ impl Stream {
}

/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
pub fn prepare_parquet(
pub async fn prepare_parquet(
&self,
init_signal: bool,
shutdown_signal: bool,
Expand All @@ -478,15 +478,12 @@ impl Stream {

// read arrow files on disk
// convert them to parquet
let schema = self
.convert_disk_files_to_parquet(
time_partition.as_ref(),
custom_partition.as_ref(),
init_signal,
shutdown_signal,
)
.inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?;

let schema = self.convert_disk_files_to_parquet(
time_partition.as_ref(),
custom_partition.as_ref(),
init_signal,
shutdown_signal,
)?;
// check if there is already a schema file in staging pertaining to this stream
// if yes, then merge them and save

Expand Down Expand Up @@ -641,6 +638,7 @@ impl Stream {

self.update_staging_metrics(&staging_files);

let mut record_batches = Vec::new();
for (parquet_path, arrow_files) in staging_files {
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
if record_reader.readers.is_empty() {
Expand All @@ -658,6 +656,7 @@ impl Stream {
&schema,
&props,
time_partition,
&mut record_batches,
)? {
continue;
}
Expand All @@ -683,6 +682,7 @@ impl Stream {
schema: &Arc<Schema>,
props: &WriterProperties,
time_partition: Option<&String>,
record_batches: &mut Vec<RecordBatch>,
) -> Result<bool, StagingError> {
let mut part_file = OpenOptions::new()
.create(true)
Expand All @@ -692,6 +692,8 @@ impl Stream {
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?;
for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
writer.write(record)?;
// Collect record batches for finding statistics later
record_batches.push(record.clone());
}
writer.close()?;

Expand Down Expand Up @@ -958,7 +960,7 @@ impl Stream {
}

/// First flushes arrows onto disk and then converts the arrow into parquet
pub fn flush_and_convert(
pub async fn flush_and_convert(
&self,
init_signal: bool,
shutdown_signal: bool,
Expand All @@ -972,7 +974,8 @@ impl Stream {
);

let start_convert = Instant::now();
self.prepare_parquet(init_signal, shutdown_signal)?;

self.prepare_parquet(init_signal, shutdown_signal).await?;
trace!(
"Converting arrows to parquet on stream ({}) took: {}s",
self.stream_name,
Expand Down Expand Up @@ -1067,7 +1070,8 @@ impl Streams {
.map(Arc::clone)
.collect();
for stream in streams {
joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal) });
joinset
.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal).await });
}
}
}
Expand Down
15 changes: 13 additions & 2 deletions src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
parseable::PARSEABLE,
rbac::{map::SessionKey, role::Action, Users},
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
users::{dashboards::DASHBOARDS, filters::FILTERS},
};

Expand Down Expand Up @@ -88,7 +88,10 @@ pub struct HomeSearchResponse {
resources: Vec<Resource>,
}

pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, PrismHomeError> {
pub async fn generate_home_response(
key: &SessionKey,
include_internal: bool,
) -> Result<HomeResponse, PrismHomeError> {
// Execute these operations concurrently
let (stream_titles_result, alerts_info_result) =
tokio::join!(get_stream_titles(key), get_alerts_info());
Expand Down Expand Up @@ -120,6 +123,14 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
for result in stream_metadata_results {
match result {
Ok((stream, metadata, dataset_type)) => {
// Skip internal streams if the flag is false
if !include_internal
&& metadata
.iter()
.all(|m| m.stream_type == StreamType::Internal)
{
continue;
}
stream_wise_stream_json.insert(stream.clone(), metadata);
datasets.push(DataSet {
title: stream,
Expand Down
44 changes: 25 additions & 19 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
Expand Down Expand Up @@ -61,6 +61,9 @@ use crate::utils::time::TimeRange;
pub static QUERY_SESSION: Lazy<SessionContext> =
Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));

pub static QUERY_SESSION_STATE: Lazy<SessionState> =
Lazy::new(|| Query::create_session_state(PARSEABLE.storage()));

/// Dedicated multi-threaded runtime to run all queries on
pub static QUERY_RUNTIME: Lazy<Runtime> =
Lazy::new(|| Runtime::new().expect("Runtime should be constructible"));
Expand Down Expand Up @@ -96,10 +99,28 @@ pub struct Query {
impl Query {
// create session context for this query
pub fn create_session_context(storage: Arc<dyn ObjectStorageProvider>) -> SessionContext {
let state = Self::create_session_state(storage.clone());

let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
});
state
.catalog_list()
.catalog(&state.config_options().catalog.default_catalog)
.expect("default catalog is provided by datafusion")
.register_schema(
&state.config_options().catalog.default_schema,
schema_provider,
)
.unwrap();

SessionContext::new_with_state(state)
}

fn create_session_state(storage: Arc<dyn ObjectStorageProvider>) -> SessionState {
let runtime_config = storage
.get_datafusion_runtime()
.with_disk_manager(DiskManagerConfig::NewOs);

let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size {
Some(size) => (size, 1.),
None => {
Expand Down Expand Up @@ -142,26 +163,11 @@ impl Query {
.parquet
.schema_force_view_types = true;

let state = SessionStateBuilder::new()
SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(runtime)
.build();

let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
});
state
.catalog_list()
.catalog(&state.config_options().catalog.default_catalog)
.expect("default catalog is provided by datafusion")
.register_schema(
&state.config_options().catalog.default_schema,
schema_provider,
)
.unwrap();

SessionContext::new_with_state(state)
.build()
}

/// this function returns the result of the query
Expand Down
Loading
Loading