diff --git a/Cargo.lock b/Cargo.lock index 609935200..02a373ca0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3495,6 +3495,7 @@ dependencies = [ "humantime", "humantime-serde", "itertools 0.14.0", + "lazy_static", "num_cpus", "object_store", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index f26e2aeb2..2b44af323 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,6 +132,7 @@ ulid = { version = "1.0", features = ["serde"] } xxhash-rust = { version = "0.8", features = ["xxh3"] } futures-core = "0.3.31" tempfile = "3.20.0" +lazy_static = "1.4.0" [build-dependencies] cargo_toml = "0.21" diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index 8611cdf63..a126fd5a0 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -415,8 +415,8 @@ create_alerts() { echo "Creating alerts with target ID: $target_id" # Alert 1: Error Count (severity_number = 18) - alert1_json="{\"severity\":\"high\",\"title\":\"error count\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"severity_number\",\"operator\":\"=\",\"value\":\"18\"}]},\"column\":\"severity_number\",\"operator\":\">\",\"value\":1000}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" - + alert1_json="{\"severity\":\"high\",\"title\":\"error count\",\"alertType\":\"threshold\",\"query\": \"select count(severity_number) as count_severity_number from demodata where severity_number=18\",\"thresholdConfig\":{\"operator\":\">\",\"value\":1000},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + response1=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert1_json" "application/json" 3) if [[ $? -eq 0 ]]; then echo "Alert 1 (Error Count) created successfully" @@ -426,8 +426,8 @@ create_alerts() { fi # Alert 2: 400 Errors - alert2_json="{\"severity\":\"critical\",\"title\":\"400 Errors\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"body\",\"operator\":\"contains\",\"value\":\"400\"}]},\"column\":\"body\",\"operator\":\">\",\"value\":10}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" - + alert2_json="{\"severity\":\"critical\",\"title\":\"400 Errors\",\"alertType\":\"threshold\",\"query\": \"select count(body) as count_body from demodata where body like '%400%'\",\"thresholdConfig\":{\"operator\":\">\",\"value\":10},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + response2=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert2_json" "application/json" 3) if [[ $? -eq 0 ]]; then echo "Alert 2 (400 Errors) created successfully" @@ -437,7 +437,7 @@ create_alerts() { fi # Alert 3: Trace ID null - alert3_json="{\"severity\":\"high\",\"title\":\"Trace ID null\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"trace_id\",\"operator\":\"is null\",\"value\":null}]},\"column\":\"trace_id\",\"operator\":\">\",\"value\":0}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + alert3_json="{\"severity\":\"high\",\"title\":\"Trace ID null\",\"alertType\":\"threshold\",\"query\": \"select count(trace_id) as count_trace_id from demodata where trace_id is null\",\"thresholdConfig\":{\"operator\":\">\",\"value\":0},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" response3=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert3_json" "application/json" 3) if [[ $? -eq 0 ]]; then echo "Alert 3 (Trace ID null) created successfully" diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index 3909e0a68..d60b0fb09 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -18,39 +18,35 @@ use std::fmt::Display; +use actix_web::Either; use arrow_array::{Float64Array, Int64Array, RecordBatch}; use datafusion::{ - functions_aggregate::{ - count::{count, count_distinct}, - expr_fn::avg, - min_max::{max, min}, - sum::sum, - }, - logical_expr::{BinaryExpr, Literal, Operator}, - prelude::{DataFrame, Expr, col, lit}, + logical_expr::Literal, + prelude::{Expr, lit}, }; use tracing::trace; use crate::{ - alerts::LogicalOperator, - handlers::http::query::{create_streams_for_distributed, update_schema_when_distributed}, + alerts::{Conditions, LogicalOperator, WhereConfigOperator}, + handlers::http::{ + cluster::send_query_request, + query::{Query, create_streams_for_distributed}, + }, + option::Mode, parseable::PARSEABLE, - query::{QUERY_SESSION, resolve_stream_names}, + query::{QUERY_SESSION, execute, resolve_stream_names}, utils::time::TimeRange, }; -use super::{ - ALERTS, AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, - AlertError, AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, -}; +use super::{ALERTS, AlertConfig, AlertError, AlertOperator, AlertState}; /// accept the alert /// -/// alert contains aggregate_config +/// alert contains query and the threshold_config /// -/// aggregate_config contains the filters which need to be applied +/// execute the query and compute the output of the query /// -/// iterate over each agg config, apply filters, the evaluate for that config +/// compare the output with the threshold_config /// /// collect the results in the end /// @@ -58,116 +54,113 @@ use super::{ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> { trace!("RUNNING EVAL TASK FOR- {alert:?}"); - let query = prepare_query(alert).await?; - let select_query = alert.get_base_query(); - let base_df = execute_base_query(&query, &select_query).await?; - let agg_results = evaluate_aggregates(&alert.aggregates, &base_df).await?; - let final_res = calculate_final_result(&alert.aggregates, &agg_results); + let time_range = extract_time_range(&alert.eval_config)?; + let final_value = execute_alert_query(alert, &time_range).await?; + let result = evaluate_condition( + &alert.threshold_config.operator, + final_value, + alert.threshold_config.value, + ); - update_alert_state(alert, final_res, &agg_results).await?; - Ok(()) + update_alert_state(alert, result, final_value).await } -async fn prepare_query(alert: &AlertConfig) -> Result { - let (start_time, end_time) = match &alert.eval_config { +/// Extract time range from alert evaluation configuration +fn extract_time_range(eval_config: &super::EvalConfig) -> Result { + let (start_time, end_time) = match eval_config { super::EvalConfig::RollingWindow(rolling_window) => (&rolling_window.eval_start, "now"), }; + TimeRange::parse_human_time(start_time, end_time) + .map_err(|err| AlertError::CustomError(err.to_string())) +} + +/// Execute the alert query based on the current mode and return the final value +async fn execute_alert_query( + alert: &AlertConfig, + time_range: &TimeRange, +) -> Result { + match PARSEABLE.options.mode { + Mode::All | Mode::Query => execute_local_query(alert, time_range).await, + Mode::Prism => execute_remote_query(alert, time_range).await, + _ => Err(AlertError::CustomError(format!( + "Unsupported mode '{:?}' for alert evaluation", + PARSEABLE.options.mode + ))), + } +} + +/// Execute alert query locally (Query/All mode) +async fn execute_local_query( + alert: &AlertConfig, + time_range: &TimeRange, +) -> Result { let session_state = QUERY_SESSION.state(); - let select_query = alert.get_base_query(); - let time_range = TimeRange::parse_human_time(start_time, end_time) - .map_err(|err| AlertError::CustomError(err.to_string()))?; + let query = &alert.query; - let tables = resolve_stream_names(&select_query)?; - //check or load streams in memory + let tables = resolve_stream_names(query)?; create_streams_for_distributed(tables.clone()) .await .map_err(|err| AlertError::CustomError(format!("Failed to create streams: {err}")))?; - let raw_logical_plan = session_state.create_logical_plan(&select_query).await?; - Ok(crate::query::Query { + + let raw_logical_plan = session_state.create_logical_plan(query).await?; + let query = crate::query::Query { raw_logical_plan, - time_range, + time_range: time_range.clone(), filter_tag: None, - }) -} + }; -async fn execute_base_query( - query: &crate::query::Query, - original_query: &str, -) -> Result { - let streams = resolve_stream_names(original_query)?; - let stream_name = streams.first().ok_or_else(|| { - AlertError::CustomError(format!("Table name not found in query- {original_query}")) - })?; - update_schema_when_distributed(&streams) + let (records, _) = execute(query, &tables[0], false) .await - .map_err(|err| { - AlertError::CustomError(format!( - "Failed to update schema for distributed streams: {err}" - )) - })?; - let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); - query - .get_dataframe(time_partition.as_ref()) - .await - .map_err(|err| AlertError::CustomError(err.to_string())) -} - -async fn evaluate_aggregates( - agg_config: &Aggregates, - base_df: &DataFrame, -) -> Result, AlertError> { - let agg_filter_exprs = get_exprs(agg_config); - let mut results = Vec::new(); - - let conditions = match &agg_config.operator { - Some(_) => &agg_config.aggregate_config[0..2], - None => &agg_config.aggregate_config[0..1], + .map_err(|err| AlertError::CustomError(format!("Failed to execute query: {err}")))?; + + let records = match records { + Either::Left(rbs) => rbs, + Either::Right(_) => { + return Err(AlertError::CustomError( + "Query returned no results".to_string(), + )); + } }; - for ((agg_expr, filter), agg) in agg_filter_exprs.into_iter().zip(conditions) { - let result = evaluate_single_aggregate(base_df, filter, agg_expr, agg).await?; - results.push(result); - } - - Ok(results) + Ok(get_final_value(records)) } -async fn evaluate_single_aggregate( - base_df: &DataFrame, - filter: Option, - agg_expr: Expr, - agg: &AggregateConfig, -) -> Result { - let filtered_df = if let Some(filter) = filter { - base_df.clone().filter(filter)? - } else { - base_df.clone() +/// Execute alert query remotely (Prism mode) +async fn execute_remote_query( + alert: &AlertConfig, + time_range: &TimeRange, +) -> Result { + let query_request = Query { + query: alert.query.clone(), + start_time: time_range.start.to_rfc3339(), + end_time: time_range.end.to_rfc3339(), + streaming: false, + send_null: false, + fields: false, + filter_tags: None, }; - let aggregated_rows = filtered_df - .aggregate(vec![], vec![agg_expr])? - .collect() - .await?; + let (result_value, _) = send_query_request(&query_request) + .await + .map_err(|err| AlertError::CustomError(format!("Failed to send query request: {err}")))?; - let final_value = get_final_value(aggregated_rows); - let result = evaluate_condition(&agg.operator, final_value, agg.value); + convert_result_to_f64(result_value) +} - let message = if result { - agg.conditions - .as_ref() - .map(|config| config.generate_filter_message()) - .or(None) +/// Convert JSON result value to f64 +fn convert_result_to_f64(result_value: serde_json::Value) -> Result { + if let Some(value) = result_value.as_f64() { + Ok(value) + } else if let Some(value) = result_value.as_i64() { + Ok(value as f64) + } else if let Some(value) = result_value.as_u64() { + Ok(value as f64) } else { - None - }; - - Ok(AggregateResult { - result, - message, - config: agg.clone(), - value: final_value, - }) + Err(AlertError::CustomError( + "Query result is not a number".to_string(), + )) + } } fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> bool { @@ -181,25 +174,21 @@ fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> b } } -fn calculate_final_result(agg_config: &Aggregates, results: &[AggregateResult]) -> bool { - match &agg_config.operator { - Some(LogicalOperator::And) => results.iter().all(|r| r.result), - Some(LogicalOperator::Or) => results.iter().any(|r| r.result), - None => results.first().is_some_and(|r| r.result), - } -} - async fn update_alert_state( alert: &AlertConfig, final_res: bool, - agg_results: &[AggregateResult], + actual_value: f64, ) -> Result<(), AlertError> { if final_res { - let message = format_alert_message(agg_results); let message = format!( - "{message}\nEvaluation Window: {}\nEvaluation Frequency: {}m", + "Alert Triggered: {}\n\nThreshold: ({} {})\nCurrent Value: {}\nEvaluation Window: {} | Frequency: {}\n\nQuery:\n{}", + alert.id, + alert.threshold_config.operator, + alert.threshold_config.value, + actual_value, alert.get_eval_window(), - alert.get_eval_frequency() + alert.get_eval_frequency(), + alert.query ); ALERTS .update_state(alert.id, AlertState::Triggered, Some(message)) @@ -215,37 +204,10 @@ async fn update_alert_state( } } -fn format_alert_message(agg_results: &[AggregateResult]) -> String { - let mut message = String::default(); - for result in agg_results { - if let Some(msg) = &result.message { - message.extend([format!( - "\nCondition: {}({}) WHERE ({}) {} {}\nActualValue: {}\n", - result.config.aggregate_function, - result.config.column, - msg, - result.config.operator, - result.config.value, - result.value - )]); - } else { - message.extend([format!( - "\nCondition: {}({}) {} {}\nActualValue: {}\n", - result.config.aggregate_function, - result.config.column, - result.config.operator, - result.config.value, - result.value - )]); - } - } - message -} - -fn get_final_value(aggregated_rows: Vec) -> f64 { - trace!("aggregated_rows-\n{aggregated_rows:?}"); +fn get_final_value(records: Vec) -> f64 { + trace!("records-\n{records:?}"); - if let Some(f) = aggregated_rows + if let Some(f) = records .first() .and_then(|batch| { trace!("batch.column(0)-\n{:?}", batch.column(0)); @@ -258,7 +220,7 @@ fn get_final_value(aggregated_rows: Vec) -> f64 { { f } else { - aggregated_rows + records .first() .and_then(|batch| { trace!("batch.column(0)-\n{:?}", batch.column(0)); @@ -272,94 +234,10 @@ fn get_final_value(aggregated_rows: Vec) -> f64 { } } -/// This function accepts aggregate_config and -/// returns a tuple of (aggregate expressions, filter expressions) -/// -/// It calls get_filter_expr() to get filter expressions -fn get_exprs(aggregate_config: &Aggregates) -> Vec<(Expr, Option)> { - let mut agg_expr = Vec::new(); - - match &aggregate_config.operator { - Some(op) => match op { - LogicalOperator::And | LogicalOperator::Or => { - let agg1 = &aggregate_config.aggregate_config[0]; - let agg2 = &aggregate_config.aggregate_config[1]; - - for agg in [agg1, agg2] { - let filter_expr = if let Some(where_clause) = &agg.conditions { - let fe = get_filter_expr(where_clause); - - trace!("filter_expr-\n{fe:?}"); - - Some(fe) - } else { - None - }; - - let e = match_aggregate_operation(agg); - agg_expr.push((e, filter_expr)); - } - } - }, - None => { - let agg = &aggregate_config.aggregate_config[0]; - - let filter_expr = if let Some(where_clause) = &agg.conditions { - let fe = get_filter_expr(where_clause); - - trace!("filter_expr-\n{fe:?}"); - - Some(fe) - } else { - None - }; - - let e = match_aggregate_operation(agg); - agg_expr.push((e, filter_expr)); - } - } - agg_expr -} - -fn get_filter_expr(where_clause: &Conditions) -> Expr { - match &where_clause.operator { - Some(op) => match op { - LogicalOperator::And => { - let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); - - let expr1 = &where_clause.condition_config[0]; - let expr2 = &where_clause.condition_config[1]; - - for e in [expr1, expr2] { - let ex = match_alert_operator(e); - expr = expr.and(ex); - } - expr - } - LogicalOperator::Or => { - let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(false))); - - let expr1 = &where_clause.condition_config[0]; - let expr2 = &where_clause.condition_config[1]; - - for e in [expr1, expr2] { - let ex = match_alert_operator(e); - expr = expr.or(ex); - } - expr - } - }, - None => { - let expr = &where_clause.condition_config[0]; - match_alert_operator(expr) - } - } -} - pub fn get_filter_string(where_clause: &Conditions) -> Result { match &where_clause.operator { Some(op) => match op { - LogicalOperator::And => { + &LogicalOperator::And => { let mut exprs = vec![]; for condition in &where_clause.condition_config { if condition.value.as_ref().is_some_and(|v| !v.is_empty()) { @@ -450,82 +328,6 @@ pub fn get_filter_string(where_clause: &Conditions) -> Result { } } -fn match_alert_operator(expr: &ConditionConfig) -> Expr { - // the form accepts value as a string - // if it can be parsed as a number, then parse it - // else keep it as a string - if expr.value.as_ref().is_some_and(|v| !v.is_empty()) { - let string_value = expr - .value - .as_ref() - .unwrap() - .replace("'", "\\'") - .replace('%', "\\%") - .replace('_', "\\_"); - let value = ValueType::from_string(string_value.clone()); - - // for maintaining column case - let column = format!(r#""{}""#, expr.column); - match expr.operator { - WhereConfigOperator::Equal => col(column).eq(lit(value)), - WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)), - WhereConfigOperator::LessThan => col(column).lt(lit(value)), - WhereConfigOperator::GreaterThan => col(column).gt(lit(value)), - WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)), - WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)), - WhereConfigOperator::ILike => col(column).ilike(lit(string_value)), - WhereConfigOperator::Contains => col(column).like(lit(format!("%{string_value}%"))), - WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new( - Box::new(col(column)), - Operator::RegexIMatch, - Box::new(lit(format!("^{string_value}"))), - )), - WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new( - Box::new(col(column)), - Operator::RegexIMatch, - Box::new(lit(format!("{string_value}$"))), - )), - WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(string_value)), - WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new( - Box::new(col(column)), - Operator::RegexNotIMatch, - Box::new(lit(format!("^{string_value}"))), - )), - WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new( - Box::new(col(column)), - Operator::RegexNotIMatch, - Box::new(lit(format!("{string_value}$"))), - )), - _ => unreachable!( - "value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation" - ), - } - } else { - // for maintaining column case - let column = format!(r#""{}""#, expr.column); - match expr.operator { - WhereConfigOperator::IsNull => col(column).is_null(), - WhereConfigOperator::IsNotNull => col(column).is_not_null(), - _ => unreachable!( - "value must be null for `is null` and `is not null`. Should've been caught in validation" - ), - } - } -} - -fn match_aggregate_operation(agg: &AggregateConfig) -> Expr { - // for maintaining column case - let column = format!(r#""{}""#, agg.column); - match agg.aggregate_function { - AggregateFunction::Avg => avg(col(column)), - AggregateFunction::CountDistinct => count_distinct(col(column)), - AggregateFunction::Count => count(col(column)), - AggregateFunction::Min => min(col(column)), - AggregateFunction::Max => max(col(column)), - AggregateFunction::Sum => sum(col(column)), - } -} - enum ValueType { Number(f64), String(String), diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 56bb109e3..a80fd076e 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -17,16 +17,18 @@ */ use actix_web::http::header::ContentType; +use arrow_schema::{DataType, Schema}; use async_trait::async_trait; use chrono::Utc; +use datafusion::logical_expr::{LogicalPlan, Projection}; use datafusion::sql::sqlparser::parser::ParserError; use derive_more::FromStrError; use derive_more::derive::FromStr; use http::StatusCode; use once_cell::sync::Lazy; use serde::Serialize; -use serde_json::Error as SerdeError; -use std::collections::{HashMap, HashSet}; +use serde_json::{Error as SerdeError, Value as JsonValue}; +use std::collections::HashMap; use std::fmt::{self, Display}; use std::thread; use std::time::Duration; @@ -40,17 +42,27 @@ pub mod alerts_utils; pub mod target; use crate::alerts::target::TARGETS; +use crate::handlers::http::fetch_schema; +use crate::handlers::http::query::create_streams_for_distributed; use crate::parseable::{PARSEABLE, StreamNotFound}; +use crate::query::{QUERY_SESSION, resolve_stream_names}; use crate::rbac::map::SessionKey; use crate::storage; -use crate::storage::ObjectStorageError; +use crate::storage::{ALERTS_ROOT_DIRECTORY, ObjectStorageError}; use crate::sync::alert_runtime; use crate::utils::user_auth_for_query; +/// Helper struct for basic alert fields during migration +struct BasicAlertFields { + id: Ulid, + title: String, + severity: Severity, +} + // these types describe the scheduled task for an alert pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); -pub const CURRENT_ALERTS_VERSION: &str = "v1"; +pub const CURRENT_ALERTS_VERSION: &str = "v2"; pub static ALERTS: Lazy = Lazy::new(|| { let (tx, rx) = mpsc::channel::(10); @@ -71,34 +83,28 @@ pub struct Alerts { } pub enum AlertTask { - Create(AlertConfig), + Create(Box), Delete(Ulid), } #[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "lowercase")] pub enum AlertVerison { - #[default] V1, + #[default] + V2, } impl From<&str> for AlertVerison { fn from(value: &str) -> Self { match value { "v1" => Self::V1, - _ => unreachable!(), + "v2" => Self::V2, + _ => Self::V2, // default to v2 } } } -#[derive(Debug)] -pub struct AggregateResult { - result: bool, - message: Option, - config: AggregateConfig, - value: f64, -} - #[async_trait] pub trait CallableTarget { async fn call(&self, payload: &Context); @@ -389,38 +395,11 @@ pub struct GroupBy { #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct AggregateConfig { - pub aggregate_function: AggregateFunction, - pub conditions: Option, - pub group_by: Option, - pub column: String, +pub struct ThresholdConfig { pub operator: AlertOperator, pub value: f64, } -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Aggregates { - pub operator: Option, - pub aggregate_config: Vec, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum LogicalOperator { - And, - Or, -} - -impl Display for LogicalOperator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - LogicalOperator::And => write!(f, "AND"), - LogicalOperator::Or => write!(f, "OR"), - } - } -} - #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct RollingWindow { @@ -522,15 +501,31 @@ impl Display for Severity { } } +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum LogicalOperator { + And, + Or, +} + +impl Display for LogicalOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + LogicalOperator::And => write!(f, "AND"), + LogicalOperator::Or => write!(f, "OR"), + } + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct AlertRequest { #[serde(default = "Severity::default")] pub severity: Severity, pub title: String, - pub stream: String, + pub query: String, pub alert_type: AlertType, - pub aggregates: Aggregates, + pub threshold_config: ThresholdConfig, pub eval_config: EvalConfig, pub targets: Vec, } @@ -546,9 +541,9 @@ impl AlertRequest { id: Ulid::new(), severity: self.severity, title: self.title, - stream: self.stream, + query: self.query, alert_type: self.alert_type, - aggregates: self.aggregates, + threshold_config: self.threshold_config, eval_config: self.eval_config, targets: self.targets, state: AlertState::default(), @@ -565,9 +560,9 @@ pub struct AlertConfig { pub id: Ulid, pub severity: Severity, pub title: String, - pub stream: String, + pub query: String, pub alert_type: AlertType, - pub aggregates: Aggregates, + pub threshold_config: ThresholdConfig, pub eval_config: EvalConfig, pub targets: Vec, // for new alerts, state should be resolved @@ -576,27 +571,482 @@ pub struct AlertConfig { } impl AlertConfig { + /// Migration function to convert v1 alerts to v2 structure + async fn migrate_from_v1( + alert_json: &JsonValue, + store: &dyn crate::storage::ObjectStorage, + ) -> Result { + let basic_fields = Self::parse_basic_fields(alert_json)?; + let alert_info = format!("Alert '{}' (ID: {})", basic_fields.title, basic_fields.id); + + let query = Self::build_query_from_v1(alert_json, &alert_info).await?; + let threshold_config = Self::extract_threshold_config(alert_json, &alert_info)?; + let eval_config = Self::extract_eval_config(alert_json, &alert_info)?; + let targets = Self::extract_targets(alert_json, &alert_info)?; + let state = Self::extract_state(alert_json); + + // Create the migrated v2 alert + let migrated_alert = AlertConfig { + version: AlertVerison::V2, + id: basic_fields.id, + severity: basic_fields.severity, + title: basic_fields.title, + query, + alert_type: AlertType::Threshold, + threshold_config, + eval_config, + targets, + state, + }; + + // Save the migrated alert back to storage + store.put_alert(basic_fields.id, &migrated_alert).await?; + + Ok(migrated_alert) + } + + /// Parse basic fields common between v1 and v2 alerts + fn parse_basic_fields(alert_json: &JsonValue) -> Result { + let id: Ulid = alert_json["id"] + .as_str() + .ok_or_else(|| AlertError::CustomError("Missing id in v1 alert".to_string()))? + .parse() + .map_err(|_| AlertError::CustomError("Invalid id format in v1 alert".to_string()))?; + + let title = alert_json["title"] + .as_str() + .ok_or_else(|| { + AlertError::CustomError(format!("Missing title in v1 alert (ID: {id})")) + })? + .to_string(); + + let severity_str = alert_json["severity"].as_str().ok_or_else(|| { + AlertError::CustomError(format!("Missing severity in v1 alert '{title}' (ID: {id})")) + })?; + + let severity = match severity_str.to_lowercase().as_str() { + "critical" => Severity::Critical, + "high" => Severity::High, + "medium" => Severity::Medium, + "low" => Severity::Low, + _ => Severity::Medium, // default + }; + + Ok(BasicAlertFields { + id, + title, + severity, + }) + } + + /// Build SQL query from v1 alert structure + async fn build_query_from_v1( + alert_json: &JsonValue, + alert_info: &str, + ) -> Result { + let stream = alert_json["stream"].as_str().ok_or_else(|| { + AlertError::CustomError(format!("Missing stream in v1 alert for {alert_info}")) + })?; + + let aggregates = &alert_json["aggregates"]; + let aggregate_config = &aggregates["aggregateConfig"][0]; + + let aggregate_function = Self::parse_aggregate_function(aggregate_config, alert_info)?; + let base_query = + Self::build_base_query(&aggregate_function, aggregate_config, stream, alert_info)?; + let final_query = + Self::add_where_conditions(base_query, aggregate_config, stream, alert_info).await?; + + Ok(final_query) + } + + /// Parse aggregate function from v1 config + fn parse_aggregate_function( + aggregate_config: &JsonValue, + alert_info: &str, + ) -> Result { + let aggregate_function_str = + aggregate_config["aggregateFunction"] + .as_str() + .ok_or_else(|| { + AlertError::CustomError(format!( + "Missing aggregateFunction in v1 alert for {alert_info}" + )) + })?; + + match aggregate_function_str.to_lowercase().as_str() { + "avg" => Ok(AggregateFunction::Avg), + "count" => Ok(AggregateFunction::Count), + "countdistinct" => Ok(AggregateFunction::CountDistinct), + "min" => Ok(AggregateFunction::Min), + "max" => Ok(AggregateFunction::Max), + "sum" => Ok(AggregateFunction::Sum), + _ => Err(AlertError::CustomError(format!( + "Unsupported aggregate function: {aggregate_function_str} for {alert_info}" + ))), + } + } + + /// Build base SQL query without WHERE conditions + fn build_base_query( + aggregate_function: &AggregateFunction, + aggregate_config: &JsonValue, + stream: &str, + _alert_info: &str, + ) -> Result { + let column = aggregate_config["column"].as_str().unwrap_or("*"); + + let query = match aggregate_function { + AggregateFunction::CountDistinct => { + if column == "*" { + format!("SELECT COUNT(*) as alert_value FROM \"{stream}\"") + } else { + format!("SELECT COUNT(DISTINCT \"{column}\") as alert_value FROM \"{stream}\"") + } + } + _ => { + if column == "*" { + format!( + "SELECT {}(*) as alert_value FROM \"{stream}\"", + aggregate_function.to_string().to_uppercase() + ) + } else if matches!(aggregate_function, AggregateFunction::Count) && column != "*" { + // COUNT with specific column should handle NULLs differently + format!("SELECT COUNT(\"{column}\") as alert_value FROM \"{stream}\"") + } else { + format!( + "SELECT {}(\"{column}\") as alert_value FROM \"{stream}\"", + aggregate_function.to_string().to_uppercase() + ) + } + } + }; + Ok(query) + } + + /// Add WHERE conditions to the base query with data type conversion + async fn add_where_conditions( + base_query: String, + aggregate_config: &JsonValue, + stream: &str, + alert_info: &str, + ) -> Result { + let Some(conditions) = aggregate_config["conditions"].as_object() else { + return Ok(base_query); + }; + + let Some(condition_config) = conditions["conditionConfig"].as_array() else { + return Ok(base_query); + }; + + if condition_config.is_empty() { + return Ok(base_query); + } + + // Fetch the stream schema for data type conversion + let schema = match fetch_schema(stream).await { + Ok(schema) => schema, + Err(e) => { + return Err(AlertError::CustomError(format!( + "Failed to fetch schema for stream '{stream}' during migration of {alert_info}: {e}. Migration cannot proceed without schema information." + ))); + } + }; + + let mut where_clauses = Vec::new(); + for condition in condition_config { + let column = condition["column"].as_str().unwrap_or(""); + if column.is_empty() { + warn!("Skipping WHERE condition with empty column name for {alert_info}"); + continue; + } + let operator_str = condition["operator"].as_str().unwrap_or("="); + let value = condition["value"].as_str().unwrap_or(""); + + let operator = Self::parse_where_operator(operator_str); + let where_clause = Self::format_where_clause_with_types( + column, &operator, value, &schema, alert_info, + )?; + where_clauses.push(where_clause); + } + + let logical_op = conditions["operator"].as_str().unwrap_or("and"); + let where_clause = where_clauses.join(&format!(" {} ", logical_op.to_uppercase())); + + Ok(format!("{base_query} WHERE {where_clause}")) + } + + /// Parse WHERE operator from string + fn parse_where_operator(operator_str: &str) -> WhereConfigOperator { + match operator_str { + "=" => WhereConfigOperator::Equal, + "!=" => WhereConfigOperator::NotEqual, + "<" => WhereConfigOperator::LessThan, + ">" => WhereConfigOperator::GreaterThan, + "<=" => WhereConfigOperator::LessThanOrEqual, + ">=" => WhereConfigOperator::GreaterThanOrEqual, + "is null" => WhereConfigOperator::IsNull, + "is not null" => WhereConfigOperator::IsNotNull, + "ilike" => WhereConfigOperator::ILike, + "contains" => WhereConfigOperator::Contains, + "begins with" => WhereConfigOperator::BeginsWith, + "ends with" => WhereConfigOperator::EndsWith, + "does not contain" => WhereConfigOperator::DoesNotContain, + "does not begin with" => WhereConfigOperator::DoesNotBeginWith, + "does not end with" => WhereConfigOperator::DoesNotEndWith, + _ => WhereConfigOperator::Equal, // default fallback + } + } + + /// Format a single WHERE clause with proper data type conversion + fn format_where_clause_with_types( + column: &str, + operator: &WhereConfigOperator, + value: &str, + schema: &Schema, + alert_info: &str, + ) -> Result { + match operator { + WhereConfigOperator::IsNull | WhereConfigOperator::IsNotNull => { + Ok(format!("\"{column}\" {}", operator.as_str())) + } + WhereConfigOperator::Contains => Ok(format!( + "\"{column}\" LIKE '%{}%'", + value.replace('\'', "''") + )), + WhereConfigOperator::BeginsWith => Ok(format!( + "\"{column}\" LIKE '{}%'", + value.replace('\'', "''") + )), + WhereConfigOperator::EndsWith => Ok(format!( + "\"{column}\" LIKE '%{}'", + value.replace('\'', "''") + )), + WhereConfigOperator::DoesNotContain => Ok(format!( + "\"{column}\" NOT LIKE '%{}%'", + value.replace('\'', "''") + )), + WhereConfigOperator::DoesNotBeginWith => Ok(format!( + "\"{column}\" NOT LIKE '{}%'", + value.replace('\'', "''") + )), + WhereConfigOperator::DoesNotEndWith => Ok(format!( + "\"{column}\" NOT LIKE '%{}'", + value.replace('\'', "''") + )), + WhereConfigOperator::ILike => Ok(format!( + "\"{column}\" ILIKE '{}'", + value.replace('\'', "''") + )), + _ => { + // Standard operators: =, !=, <, >, <=, >= + let formatted_value = + Self::convert_value_by_data_type(column, value, schema, alert_info)?; + Ok(format!( + "\"{column}\" {} {formatted_value}", + operator.as_str() + )) + } + } + } + + /// Convert string value to appropriate data type based on schema + fn convert_value_by_data_type( + column: &str, + value: &str, + schema: &Schema, + alert_info: &str, + ) -> Result { + // Find the field in the schema + let field = schema.fields().iter().find(|f| f.name() == column); + let Some(field) = field else { + // Column not found in schema, fail migration + return Err(AlertError::CustomError(format!( + "Column '{column}' not found in stream schema during migration of {alert_info}. Available columns: [{}]", + schema + .fields() + .iter() + .map(|f| f.name().clone()) + .collect::>() + .join(", ") + ))); + }; + + match field.data_type() { + DataType::Float64 => { + match value.parse::() { + Ok(float_val) => Ok(float_val.to_string()), // Raw number without quotes + Err(_) => Err(AlertError::CustomError(format!( + "Failed to parse value '{value}' as float64 for column '{column}' during migration of {alert_info}", + ))), + } + } + DataType::Int64 => { + match value.parse::() { + Ok(int_val) => Ok(int_val.to_string()), // Raw number without quotes + Err(_) => Err(AlertError::CustomError(format!( + "Failed to parse value '{value}' as int64 for column '{column}' during migration of {alert_info}", + ))), + } + } + DataType::Boolean => { + match value.to_lowercase().parse::() { + Ok(bool_val) => Ok(bool_val.to_string()), // Raw boolean without quotes + Err(_) => Err(AlertError::CustomError(format!( + "Failed to parse value '{value}' as boolean for column '{column}' during migration of {alert_info}", + ))), + } + } + DataType::Date32 | DataType::Date64 => { + // For date types, try to validate the format but keep as quoted string in SQL + match chrono::NaiveDate::parse_from_str(value, "%Y-%m-%d") { + Ok(_) => Ok(format!("'{}'", value.replace('\'', "''"))), + Err(_) => { + // Try ISO format + match value.parse::>() { + Ok(_) => Ok(format!("'{}'", value.replace('\'', "''"))), + Err(_) => Err(AlertError::CustomError(format!( + "Failed to parse value '{value}' as date for column '{column}' during migration of {alert_info}", + ))), + } + } + } + } + DataType::Timestamp(..) => { + // For timestamp types, try to validate but keep as quoted string in SQL + match value.parse::>() { + Ok(_) => Ok(format!("'{}'", value.replace('\'', "''"))), + Err(_) => Err(AlertError::CustomError(format!( + "Failed to parse value '{value}' as timestamp for column '{column}' during migration of {alert_info}", + ))), + } + } + _ => { + // For all other data types (string, binary, etc.), use string with quotes + Ok(format!("'{}'", value.replace('\'', "''"))) + } + } + } + + /// Extract threshold configuration from v1 alert + fn extract_threshold_config( + alert_json: &JsonValue, + alert_info: &str, + ) -> Result { + let aggregates = &alert_json["aggregates"]; + let aggregate_config = &aggregates["aggregateConfig"][0]; + + let threshold_operator = aggregate_config["operator"].as_str().ok_or_else(|| { + AlertError::CustomError(format!("Missing operator in v1 alert for {alert_info}")) + })?; + + let threshold_value = aggregate_config["value"].as_f64().ok_or_else(|| { + AlertError::CustomError(format!("Missing value in v1 alert for {alert_info}")) + })?; + + let operator = match threshold_operator { + ">" => AlertOperator::GreaterThan, + "<" => AlertOperator::LessThan, + "=" => AlertOperator::Equal, + "!=" => AlertOperator::NotEqual, + ">=" => AlertOperator::GreaterThanOrEqual, + "<=" => AlertOperator::LessThanOrEqual, + _ => AlertOperator::GreaterThan, // default + }; + + Ok(ThresholdConfig { + operator, + value: threshold_value, + }) + } + + /// Extract evaluation configuration from v1 alert + fn extract_eval_config( + alert_json: &JsonValue, + alert_info: &str, + ) -> Result { + let rolling_window = &alert_json["evalConfig"]["rollingWindow"]; + + let eval_start = rolling_window["evalStart"] + .as_str() + .ok_or_else(|| { + AlertError::CustomError(format!("Missing evalStart in v1 alert for {alert_info}")) + })? + .to_string(); + + let eval_end = rolling_window["evalEnd"] + .as_str() + .ok_or_else(|| { + AlertError::CustomError(format!("Missing evalEnd in v1 alert for {alert_info}")) + })? + .to_string(); + + let eval_frequency = rolling_window["evalFrequency"].as_u64().ok_or_else(|| { + AlertError::CustomError(format!( + "Missing evalFrequency in v1 alert for {alert_info}" + )) + })?; + + Ok(EvalConfig::RollingWindow(RollingWindow { + eval_start, + eval_end, + eval_frequency, + })) + } + + /// Extract target IDs from v1 alert + fn extract_targets(alert_json: &JsonValue, alert_info: &str) -> Result, AlertError> { + let targets: Result, _> = alert_json["targets"] + .as_array() + .ok_or_else(|| { + AlertError::CustomError(format!("Missing targets in v1 alert for {alert_info}")) + })? + .iter() + .map(|t| { + t.as_str() + .ok_or_else(|| { + AlertError::CustomError(format!("Invalid target format for {alert_info}")) + })? + .parse() + .map_err(|_| { + AlertError::CustomError(format!( + "Invalid target ID format for {alert_info}" + )) + }) + }) + .collect(); + + targets + } + + /// Extract alert state from v1 alert + fn extract_state(alert_json: &JsonValue) -> AlertState { + let state_str = alert_json["state"].as_str().unwrap_or("resolved"); + match state_str.to_lowercase().as_str() { + "triggered" => AlertState::Triggered, + "silenced" => AlertState::Silenced, + "resolved" => AlertState::Resolved, + _ => AlertState::Resolved, + } + } + pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> { // Validate that all target IDs exist for id in &alert.targets { TARGETS.get_target_by_id(id).await?; } self.title = alert.title; - self.stream = alert.stream; + self.query = alert.query; self.alert_type = alert.alert_type; - self.aggregates = alert.aggregates; + self.threshold_config = alert.threshold_config; self.eval_config = alert.eval_config; self.targets = alert.targets; self.state = AlertState::default(); Ok(()) } - pub fn get_base_query(&self) -> String { - format!("SELECT * FROM \"{}\"", self.stream) - } - /// Validations - pub async fn validate(&self) -> Result<(), AlertError> { + pub async fn validate(&self, session_key: SessionKey) -> Result<(), AlertError> { // validate evalType let eval_frequency = match &self.eval_config { EvalConfig::RollingWindow(rolling_window) => { @@ -627,159 +1077,65 @@ impl AlertConfig { } } - // validate aggregateConfig and conditionConfig - self.validate_configs()?; - - // validate the presence of columns - let columns = self.get_agg_config_cols(); - - let schema = PARSEABLE.get_stream(&self.stream)?.get_schema(); - - let schema_columns = schema - .fields() - .iter() - .map(|f| f.name()) - .collect::>(); - - for col in columns { - if !schema_columns.contains(col) { - return Err(AlertError::CustomError(format!( - "Column {} not found in stream {}", - col, self.stream - ))); - } + // validate that the query is valid + if self.query.is_empty() { + return Err(AlertError::InvalidAlertQuery); } - Ok(()) - } - - fn validate_configs(&self) -> Result<(), AlertError> { - fn validate_condition_config(config: &Option) -> Result<(), AlertError> { - if config.is_none() { - return Ok(()); - } - let config = config.as_ref().unwrap(); - match &config.operator { - Some(_) => { - // only two aggregate conditions should be present - if config.condition_config.len() != 2 { - return Err(AlertError::CustomError( - "While using AND/OR, only two conditions must be used".to_string(), - )); - } - } - None => { - // only one aggregate condition should be present - if config.condition_config.len() != 1 { - return Err(AlertError::CustomError( - "While not using AND/OR, only one condition must be used".to_string(), - )); - } - } - } - - // validate that the value should be None in case of `is null` and `is not null` - for condition in config.condition_config.iter() { - let needs_no_value = matches!( - condition.operator, - WhereConfigOperator::IsNull | WhereConfigOperator::IsNotNull - ); - - if needs_no_value && condition.value.as_ref().is_some_and(|v| !v.is_empty()) { - return Err(AlertError::CustomError( - "value must be null when operator is either `is null` or `is not null`" - .into(), - )); - } - if !needs_no_value && condition.value.as_ref().is_none_or(|v| v.is_empty()) { - return Err(AlertError::CustomError( - "value must not be null when operator is neither `is null` nor `is not null`" - .into(), - )); - } - } - Ok(()) + let tables = resolve_stream_names(&self.query)?; + if tables.is_empty() { + return Err(AlertError::InvalidAlertQuery); } + create_streams_for_distributed(tables) + .await + .map_err(|_| AlertError::InvalidAlertQuery)?; - // validate aggregate config(s) - match &self.aggregates.operator { - Some(_) => { - // only two aggregate conditions should be present - if self.aggregates.aggregate_config.len() != 2 { - return Err(AlertError::CustomError( - "While using AND/OR, only two aggregate conditions must be used" - .to_string(), - )); - } - - // validate condition config - let agg1 = &self.aggregates.aggregate_config[0]; - let agg2 = &self.aggregates.aggregate_config[1]; + // validate that the user has access to the tables mentioned in the query + user_auth_for_query(&session_key, &self.query).await?; - validate_condition_config(&agg1.conditions)?; - validate_condition_config(&agg2.conditions)?; - } - None => { - // only one aggregate condition should be present - if self.aggregates.aggregate_config.len() != 1 { - return Err(AlertError::CustomError( - "While not using AND/OR, only one aggregate condition must be used" - .to_string(), - )); - } - - let agg = &self.aggregates.aggregate_config[0]; - validate_condition_config(&agg.conditions)?; - } + // validate that the alert query is valid and can be evaluated + if !Self::is_query_aggregate(&self.query).await? { + return Err(AlertError::InvalidAlertQuery); } Ok(()) } - fn get_agg_config_cols(&self) -> HashSet<&String> { - let mut columns: HashSet<&String> = HashSet::new(); - match &self.aggregates.operator { - Some(op) => match op { - LogicalOperator::And | LogicalOperator::Or => { - let agg1 = &self.aggregates.aggregate_config[0]; - let agg2 = &self.aggregates.aggregate_config[1]; + /// Check if a query is an aggregate query that returns a single value without executing it + async fn is_query_aggregate(query: &str) -> Result { + let session_state = QUERY_SESSION.state(); - columns.insert(&agg1.column); - columns.insert(&agg2.column); + // Parse the query into a logical plan + let logical_plan = session_state + .create_logical_plan(query) + .await + .map_err(|err| AlertError::CustomError(format!("Failed to parse query: {err}")))?; - if let Some(condition) = &agg1.conditions { - columns.extend(self.get_condition_cols(condition)); - } - } - }, - None => { - let agg = &self.aggregates.aggregate_config[0]; - columns.insert(&agg.column); + // Check if the plan structure indicates an aggregate query + Ok(Self::is_logical_plan_aggregate(&logical_plan)) + } - if let Some(condition) = &agg.conditions { - columns.extend(self.get_condition_cols(condition)); - } + /// Analyze a logical plan to determine if it represents an aggregate query + fn is_logical_plan_aggregate(plan: &LogicalPlan) -> bool { + match plan { + // Direct aggregate: SELECT COUNT(*), AVG(col), etc. + LogicalPlan::Aggregate(_) => true, + + // Projection over aggregate: SELECT COUNT(*) as total, SELECT AVG(col) as average + LogicalPlan::Projection(Projection { input, expr, .. }) => { + // Check if input contains an aggregate and we have exactly one expression + let is_aggregate_input = Self::is_logical_plan_aggregate(input); + let single_expr = expr.len() == 1; + is_aggregate_input && single_expr } - } - columns - } - fn get_condition_cols<'a>(&'a self, condition: &'a Conditions) -> HashSet<&'a String> { - let mut columns: HashSet<&String> = HashSet::new(); - match &condition.operator { - Some(op) => match op { - LogicalOperator::And | LogicalOperator::Or => { - let c1 = &condition.condition_config[0]; - let c2 = &condition.condition_config[1]; - columns.insert(&c1.column); - columns.insert(&c2.column); - } - }, - None => { - let c = &condition.condition_config[0]; - columns.insert(&c.column); + // Recursively check wrapped plans (Filter, Limit, Sort, etc.) + _ => { + // Use inputs() method to get all input plans + plan.inputs() + .iter() + .any(|input| Self::is_logical_plan_aggregate(input)) } } - columns } pub fn get_eval_frequency(&self) -> u64 { @@ -863,6 +1219,8 @@ pub enum AlertError { TargetInUse, #[error("{0}")] ParserError(#[from] ParserError), + #[error("Invalid alert query")] + InvalidAlertQuery, } impl actix_web::ResponseError for AlertError { @@ -884,6 +1242,7 @@ impl actix_web::ResponseError for AlertError { Self::InvalidTargetModification(_) => StatusCode::BAD_REQUEST, Self::TargetInUse => StatusCode::CONFLICT, Self::ParserError(_) => StatusCode::BAD_REQUEST, + Self::InvalidAlertQuery => StatusCode::BAD_REQUEST, } } @@ -900,13 +1259,78 @@ impl Alerts { let mut map = self.alerts.write().await; let store = PARSEABLE.storage.get_object_store(); - for alert in store.get_alerts().await.unwrap_or_default() { - match self.sender.send(AlertTask::Create(alert.clone())).await { + // Get alerts path and read raw bytes for migration handling + let relative_path = relative_path::RelativePathBuf::from(ALERTS_ROOT_DIRECTORY); + + let raw_objects = store + .get_objects( + Some(&relative_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await + .unwrap_or_default(); + + for raw_bytes in raw_objects { + // First, try to parse as JSON Value to check version + let json_value: JsonValue = match serde_json::from_slice(&raw_bytes) { + Ok(val) => val, + Err(e) => { + error!("Failed to parse alert JSON: {e}"); + continue; + } + }; + + // Check version and handle migration + let alert = if let Some(version_str) = json_value["version"].as_str() { + if version_str == "v1" + || json_value["query"].is_null() + || json_value.get("stream").is_some() + { + // This is a v1 alert that needs migration + match AlertConfig::migrate_from_v1(&json_value, store.as_ref()).await { + Ok(migrated) => migrated, + Err(e) => { + error!("Failed to migrate v1 alert: {e}"); + continue; + } + } + } else { + // Try to parse as v2 + match serde_json::from_value::(json_value) { + Ok(alert) => alert, + Err(e) => { + error!("Failed to parse v2 alert: {e}"); + continue; + } + } + } + } else { + // No version field, assume v1 and migrate + warn!("Found alert without version field, assuming v1 and migrating"); + match AlertConfig::migrate_from_v1(&json_value, store.as_ref()).await { + Ok(migrated) => migrated, + Err(e) => { + error!("Failed to migrate alert without version: {e}"); + continue; + } + } + }; + + // Create alert task + match self + .sender + .send(AlertTask::Create(Box::new(alert.clone()))) + .await + { Ok(_) => {} Err(e) => { warn!("Failed to create alert task: {e}\nRetrying..."); // Retry sending the task - match self.sender.send(AlertTask::Create(alert.clone())).await { + match self + .sender + .send(AlertTask::Create(Box::new(alert.clone()))) + .await + { Ok(_) => {} Err(e) => { error!("Failed to create alert task: {e}"); @@ -915,6 +1339,7 @@ impl Alerts { } } }; + map.insert(alert.id, alert); } @@ -929,8 +1354,7 @@ impl Alerts { let mut alerts: Vec = Vec::new(); for (_, alert) in self.alerts.read().await.iter() { // filter based on whether the user can execute this query or not - let query = alert.get_base_query(); - if user_auth_for_query(&session, &query).await.is_ok() { + if user_auth_for_query(&session, &alert.query).await.is_ok() { alerts.push(alert.to_owned()); } } @@ -1017,7 +1441,7 @@ impl Alerts { /// Start a scheduled alert task pub async fn start_task(&self, alert: AlertConfig) -> Result<(), AlertError> { self.sender - .send(AlertTask::Create(alert)) + .send(AlertTask::Create(Box::new(alert))) .await .map_err(|e| AlertError::CustomError(e.to_string()))?; Ok(()) diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 57454dda7..a8f5ebe5d 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -49,13 +49,12 @@ pub async fn post( Json(alert): Json, ) -> Result { let alert: AlertConfig = alert.into().await?; - alert.validate().await?; // validate the incoming alert query // does the user have access to these tables or not? let session_key = extract_session_key_from_req(&req)?; - let query = alert.get_base_query(); - user_auth_for_query(&session_key, &query).await?; + + alert.validate(session_key).await?; // now that we've validated that the user can run this query // move on to saving the alert in ObjectStore @@ -79,9 +78,8 @@ pub async fn get(req: HttpRequest, alert_id: Path) -> Result) -> Result>> = + Arc::new(RwLock::new(HashMap::new())); + static ref LAST_USED_QUERIER: Arc>> = Arc::new(RwLock::new(None)); +} + pub async fn for_each_live_ingestor(api_fn: F) -> Result<(), E> where F: Fn(NodeMetadata) -> Fut + Clone + Send + Sync + 'static, @@ -1104,3 +1112,261 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { Ok(()) } + +#[derive(Clone, Debug)] +struct QuerierStatus { + metadata: QuerierMetadata, + available: bool, + last_used: Option, +} + +async fn get_available_querier() -> Result { + // Get all querier metadata + let querier_metadata: Vec = get_node_info(NodeType::Querier).await?; + + // No queriers found + if querier_metadata.is_empty() { + return Err(QueryError::NoAvailableQuerier); + } + + // Limit concurrency for liveness checks to avoid resource exhaustion + const MAX_CONCURRENT_LIVENESS_CHECKS: usize = 10; + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_LIVENESS_CHECKS)); + + // Update the querier map with new metadata and get an available querier + let mut map = QUERIER_MAP.write().await; + + let existing_domains: Vec = map.keys().cloned().collect(); + let mut live_domains = std::collections::HashSet::new(); + + // Use stream with concurrency limit instead of join_all + let liveness_results: Vec<(String, bool, NodeMetadata)> = stream::iter(querier_metadata) + .map(|metadata| { + let domain = metadata.domain_name.clone(); + let metadata_clone = metadata.clone(); + let semaphore = Arc::clone(&semaphore); + + async move { + let _permit = semaphore.acquire().await.unwrap(); + let is_live = check_liveness(&domain).await; + (domain, is_live, metadata_clone) + } + }) + .buffer_unordered(MAX_CONCURRENT_LIVENESS_CHECKS) + .collect() + .await; + + // Update the map based on liveness results + for (domain, is_live, metadata) in liveness_results { + if is_live { + live_domains.insert(domain.clone()); + // Update existing entry or add new one + if let Some(status) = map.get_mut(&domain) { + // Update metadata for existing entry, preserve last_used + status.metadata = metadata; + } else { + // Add new entry + map.insert( + domain, + QuerierStatus { + metadata, + available: true, + last_used: None, + }, + ); + } + } + } + + // Remove entries that are not live anymore + existing_domains.iter().for_each(|domain| { + if !live_domains.contains(domain) { + map.remove(domain); + } + }); + + // Find the next available querier using round-robin strategy + if let Some(selected_domain) = select_next_querier(&mut map).await { + if let Some(status) = map.get_mut(&selected_domain) { + status.available = false; + status.last_used = Some(Instant::now()); + return Ok(status.metadata.clone()); + } + } + + // If no querier is available, use least-recently-used strategy + if let Some(selected_domain) = select_least_recently_used_querier(&mut map) { + if let Some(status) = map.get_mut(&selected_domain) { + status.available = false; + status.last_used = Some(Instant::now()); + return Ok(status.metadata.clone()); + } + } + + // If no querier is available, return an error + Err(QueryError::NoAvailableQuerier) +} + +/// Select next querier using round-robin strategy +async fn select_next_querier(map: &mut HashMap) -> Option { + // First, try to find any available querier + let available_queriers: Vec = map + .iter() + .filter_map(|(domain, status)| { + if status.available { + Some(domain.clone()) + } else { + None + } + }) + .collect(); + + if available_queriers.is_empty() { + return None; + } + + // Get the last used querier for round-robin + let last_used = LAST_USED_QUERIER.read().await; + + if let Some(ref last_domain) = *last_used { + // Find the next querier in the list after the last used one + let mut found_last = false; + for domain in &available_queriers { + if found_last { + drop(last_used); + *LAST_USED_QUERIER.write().await = Some(domain.clone()); + return Some(domain.clone()); + } + if domain == last_domain { + found_last = true; + } + } + // If we reached here, either last_used querier is not available anymore + // or it was the last in the list, so wrap around to the first + if let Some(first_domain) = available_queriers.first() { + drop(last_used); + *LAST_USED_QUERIER.write().await = Some(first_domain.clone()); + return Some(first_domain.clone()); + } + } else { + // No previous querier, select the first available one + if let Some(first_domain) = available_queriers.first() { + drop(last_used); + *LAST_USED_QUERIER.write().await = Some(first_domain.clone()); + return Some(first_domain.clone()); + } + } + + None +} + +/// Select the least recently used querier when no querier is marked as available +fn select_least_recently_used_querier(map: &mut HashMap) -> Option { + if map.is_empty() { + return None; + } + + // Find the querier that was used least recently (or never used) + let mut least_recently_used_domain: Option = None; + let mut oldest_time: Option = None; + + for (domain, status) in map.iter() { + match (status.last_used, oldest_time) { + // Never used - highest priority + (None, _) => { + least_recently_used_domain = Some(domain.clone()); + oldest_time = None; + } + // Used, but we haven't found any used querier yet + (Some(used_time), None) => { + if least_recently_used_domain.is_none() { + least_recently_used_domain = Some(domain.clone()); + oldest_time = Some(used_time); + } + } + // Used, and we have a candidate - compare times + (Some(used_time), Some(current_oldest)) => { + if used_time < current_oldest { + least_recently_used_domain = Some(domain.clone()); + oldest_time = Some(used_time); + } + } + } + } + + least_recently_used_domain +} + +// Mark a querier as available again +pub async fn mark_querier_available(domain_name: &str) { + let mut map = QUERIER_MAP.write().await; + if let Some(status) = map.get_mut(domain_name) { + status.available = true; + // Note: We don't reset last_used here as it's used for LRU selection + } +} + +pub async fn send_query_request(query_request: &Query) -> Result<(JsonValue, String), QueryError> { + let querier = get_available_querier().await?; + let domain_name = querier.domain_name.clone(); + + // Perform the query request + let fields = query_request.fields; + let streaming = query_request.streaming; + let send_null = query_request.send_null; + let uri = format!( + "{}api/v1/query?fields={fields}&streaming={streaming}&send_null={send_null}", + &querier.domain_name, + ); + + let body = match serde_json::to_string(&query_request) { + Ok(body) => body, + Err(err) => { + mark_querier_available(&domain_name).await; + return Err(QueryError::from(err)); + } + }; + + let res = match INTRA_CLUSTER_CLIENT + .post(uri) + .header(header::AUTHORIZATION, &querier.token) + .header(header::CONTENT_TYPE, "application/json") + .body(body) + .send() + .await + { + Ok(res) => res, + Err(err) => { + mark_querier_available(&domain_name).await; + return Err(QueryError::from(err)); + } + }; + + // Mark querier as available immediately after the HTTP request completes + mark_querier_available(&domain_name).await; + + let headers = res.headers(); + let total_time = match headers.get(TIME_ELAPSED_HEADER) { + Some(v) => { + let total_time = v.to_str().unwrap_or_default(); + total_time.to_string() + } + None => String::default(), + }; + + if res.status().is_success() { + match res.text().await { + Ok(text) => { + let query_response: JsonValue = serde_json::from_str(&text)?; + Ok((query_response, total_time)) + } + Err(err) => { + error!("Error parsing query response: {:?}", err); + Err(QueryError::Anyhow(err.into())) + } + } + } else { + let err_text = res.text().await?; + Err(QueryError::JsonParse(err_text)) + } +} diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 2cf16ff76..bd2870138 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -56,7 +56,7 @@ use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; use crate::utils::user_auth_for_datasets; -const TIME_ELAPSED_HEADER: &str = "p-time-elapsed"; +pub const TIME_ELAPSED_HEADER: &str = "p-time-elapsed"; /// Query Request through http endpoint. #[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")]