diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs
new file mode 100644
index 000000000..74be90393
--- /dev/null
+++ b/src/alerts/alert_types.rs
@@ -0,0 +1,250 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::time::Duration;
+
+use chrono::{DateTime, Utc};
+use tonic::async_trait;
+use ulid::Ulid;
+
+use crate::{
+ alerts::{
+ AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity,
+ ThresholdConfig,
+ alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range},
+ is_query_aggregate,
+ target::{self, TARGETS},
+ traits::AlertTrait,
+ },
+ handlers::http::query::create_streams_for_distributed,
+ option::Mode,
+ parseable::PARSEABLE,
+ query::resolve_stream_names,
+ rbac::map::SessionKey,
+ utils::user_auth_for_query,
+};
+
+/// Struct which defines the threshold type alerts
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
+pub struct ThresholdAlert {
+ pub version: AlertVersion,
+ #[serde(default)]
+ pub id: Ulid,
+ pub severity: Severity,
+ pub title: String,
+ pub query: String,
+ pub alert_type: AlertType,
+ pub threshold_config: ThresholdConfig,
+ pub eval_config: EvalConfig,
+ pub targets: Vec,
+ // for new alerts, state should be resolved
+ #[serde(default)]
+ pub state: AlertState,
+ pub created: DateTime,
+ pub tags: Option>,
+ pub datasets: Vec,
+}
+
+#[async_trait]
+impl AlertTrait for ThresholdAlert {
+ async fn eval_alert(&self) -> Result<(bool, f64), AlertError> {
+ let time_range = extract_time_range(&self.eval_config)?;
+ let final_value = execute_alert_query(self.get_query(), &time_range).await?;
+ let result = evaluate_condition(
+ &self.threshold_config.operator,
+ final_value,
+ self.threshold_config.value,
+ );
+ Ok((result, final_value))
+ }
+
+ async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError> {
+ // validate alert type
+ // Anomaly is only allowed in Prism
+ if self.alert_type.eq(&AlertType::Anomaly) && PARSEABLE.options.mode != Mode::Prism {
+ return Err(AlertError::CustomError(
+ "Anomaly alert is only allowed on Prism mode".into(),
+ ));
+ }
+
+ // validate evalType
+ let eval_frequency = match &self.eval_config {
+ EvalConfig::RollingWindow(rolling_window) => {
+ if humantime::parse_duration(&rolling_window.eval_start).is_err() {
+ return Err(AlertError::Metadata(
+ "evalStart should be of type humantime",
+ ));
+ }
+ rolling_window.eval_frequency
+ }
+ };
+
+ // validate that target repeat notifs !> eval_frequency
+ for target_id in &self.targets {
+ let target = TARGETS.get_target_by_id(target_id).await?;
+ match &target.notification_config.times {
+ target::Retry::Infinite => {}
+ target::Retry::Finite(repeat) => {
+ let notif_duration =
+ Duration::from_secs(60 * target.notification_config.interval)
+ * *repeat as u32;
+ if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) {
+ return Err(AlertError::Metadata(
+ "evalFrequency should be greater than target repetition interval",
+ ));
+ }
+ }
+ }
+ }
+
+ // validate that the query is valid
+ if self.query.is_empty() {
+ return Err(AlertError::InvalidAlertQuery);
+ }
+
+ let tables = resolve_stream_names(&self.query)?;
+ if tables.is_empty() {
+ return Err(AlertError::InvalidAlertQuery);
+ }
+ create_streams_for_distributed(tables)
+ .await
+ .map_err(|_| AlertError::InvalidAlertQuery)?;
+
+ // validate that the user has access to the tables mentioned in the query
+ user_auth_for_query(session_key, &self.query).await?;
+
+ // validate that the alert query is valid and can be evaluated
+ if !is_query_aggregate(&self.query).await? {
+ return Err(AlertError::InvalidAlertQuery);
+ }
+ Ok(())
+ }
+
+ fn get_id(&self) -> &Ulid {
+ &self.id
+ }
+
+ fn get_query(&self) -> &str {
+ &self.query
+ }
+
+ fn get_severity(&self) -> &Severity {
+ &self.severity
+ }
+
+ fn get_title(&self) -> &str {
+ &self.title
+ }
+
+ fn get_alert_type(&self) -> &AlertType {
+ &self.alert_type
+ }
+
+ fn get_threshold_config(&self) -> &ThresholdConfig {
+ &self.threshold_config
+ }
+
+ fn get_eval_config(&self) -> &EvalConfig {
+ &self.eval_config
+ }
+
+ fn get_targets(&self) -> &Vec {
+ &self.targets
+ }
+
+ fn get_state(&self) -> &AlertState {
+ &self.state
+ }
+
+ fn get_eval_frequency(&self) -> u64 {
+ match &self.eval_config {
+ EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency,
+ }
+ }
+
+ fn get_eval_window(&self) -> String {
+ match &self.eval_config {
+ EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_start.clone(),
+ }
+ }
+
+ fn get_created(&self) -> String {
+ self.created.to_string()
+ }
+
+ fn get_tags(&self) -> &Option> {
+ &self.tags
+ }
+
+ fn get_datasets(&self) -> &Vec {
+ &self.datasets
+ }
+
+ fn to_alert_config(&self) -> AlertConfig {
+ let clone = self.clone();
+ clone.into()
+ }
+
+ fn clone_box(&self) -> Box {
+ Box::new(self.clone())
+ }
+
+ fn set_state(&mut self, new_state: AlertState) {
+ self.state = new_state
+ }
+}
+
+impl From for ThresholdAlert {
+ fn from(value: AlertConfig) -> Self {
+ Self {
+ version: value.version,
+ id: value.id,
+ severity: value.severity,
+ title: value.title,
+ query: value.query,
+ alert_type: value.alert_type,
+ threshold_config: value.threshold_config,
+ eval_config: value.eval_config,
+ targets: value.targets,
+ state: value.state,
+ created: value.created,
+ tags: value.tags,
+ datasets: value.datasets,
+ }
+ }
+}
+
+impl From for AlertConfig {
+ fn from(val: ThresholdAlert) -> Self {
+ AlertConfig {
+ version: val.version,
+ id: val.id,
+ severity: val.severity,
+ title: val.title,
+ query: val.query,
+ alert_type: val.alert_type,
+ threshold_config: val.threshold_config,
+ eval_config: val.eval_config,
+ targets: val.targets,
+ state: val.state,
+ created: val.created,
+ tags: val.tags,
+ datasets: val.datasets,
+ }
+ }
+}
diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs
index d60b0fb09..f6eb0b1ce 100644
--- a/src/alerts/alerts_utils.rs
+++ b/src/alerts/alerts_utils.rs
@@ -24,10 +24,11 @@ use datafusion::{
logical_expr::Literal,
prelude::{Expr, lit},
};
+use itertools::Itertools;
use tracing::trace;
use crate::{
- alerts::{Conditions, LogicalOperator, WhereConfigOperator},
+ alerts::{AlertTrait, Conditions, LogicalOperator, WhereConfigOperator},
handlers::http::{
cluster::send_query_request,
query::{Query, create_streams_for_distributed},
@@ -38,7 +39,7 @@ use crate::{
utils::time::TimeRange,
};
-use super::{ALERTS, AlertConfig, AlertError, AlertOperator, AlertState};
+use super::{ALERTS, AlertError, AlertOperator, AlertState};
/// accept the alert
///
@@ -51,22 +52,16 @@ use super::{ALERTS, AlertConfig, AlertError, AlertOperator, AlertState};
/// collect the results in the end
///
/// check whether notification needs to be triggered or not
-pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
+pub async fn evaluate_alert(alert: &dyn AlertTrait) -> Result<(), AlertError> {
trace!("RUNNING EVAL TASK FOR- {alert:?}");
- let time_range = extract_time_range(&alert.eval_config)?;
- let final_value = execute_alert_query(alert, &time_range).await?;
- let result = evaluate_condition(
- &alert.threshold_config.operator,
- final_value,
- alert.threshold_config.value,
- );
+ let (result, final_value) = alert.eval_alert().await?;
update_alert_state(alert, result, final_value).await
}
/// Extract time range from alert evaluation configuration
-fn extract_time_range(eval_config: &super::EvalConfig) -> Result {
+pub fn extract_time_range(eval_config: &super::EvalConfig) -> Result {
let (start_time, end_time) = match eval_config {
super::EvalConfig::RollingWindow(rolling_window) => (&rolling_window.eval_start, "now"),
};
@@ -76,13 +71,10 @@ fn extract_time_range(eval_config: &super::EvalConfig) -> Result Result {
+pub async fn execute_alert_query(query: &str, time_range: &TimeRange) -> Result {
match PARSEABLE.options.mode {
- Mode::All | Mode::Query => execute_local_query(alert, time_range).await,
- Mode::Prism => execute_remote_query(alert, time_range).await,
+ Mode::All | Mode::Query => execute_local_query(query, time_range).await,
+ Mode::Prism => execute_remote_query(query, time_range).await,
_ => Err(AlertError::CustomError(format!(
"Unsupported mode '{:?}' for alert evaluation",
PARSEABLE.options.mode
@@ -91,12 +83,8 @@ async fn execute_alert_query(
}
/// Execute alert query locally (Query/All mode)
-async fn execute_local_query(
- alert: &AlertConfig,
- time_range: &TimeRange,
-) -> Result {
+async fn execute_local_query(query: &str, time_range: &TimeRange) -> Result {
let session_state = QUERY_SESSION.state();
- let query = &alert.query;
let tables = resolve_stream_names(query)?;
create_streams_for_distributed(tables.clone())
@@ -127,12 +115,9 @@ async fn execute_local_query(
}
/// Execute alert query remotely (Prism mode)
-async fn execute_remote_query(
- alert: &AlertConfig,
- time_range: &TimeRange,
-) -> Result {
+async fn execute_remote_query(query: &str, time_range: &TimeRange) -> Result {
let query_request = Query {
- query: alert.query.clone(),
+ query: query.to_string(),
start_time: time_range.start.to_rfc3339(),
end_time: time_range.end.to_rfc3339(),
streaming: false,
@@ -150,20 +135,21 @@ async fn execute_remote_query(
/// Convert JSON result value to f64
fn convert_result_to_f64(result_value: serde_json::Value) -> Result {
- if let Some(value) = result_value.as_f64() {
- Ok(value)
- } else if let Some(value) = result_value.as_i64() {
- Ok(value as f64)
- } else if let Some(value) = result_value.as_u64() {
- Ok(value as f64)
+ // due to the previous validations, we can be sure that we get an array of objects with just one entry
+ // [{"countField": Number(1120.251)}]
+ if let Some(array_val) = result_value.as_array().filter(|arr| !arr.is_empty())
+ && let Some(object) = array_val[0].as_object()
+ {
+ let values = object.values().map(|v| v.as_f64().unwrap()).collect_vec();
+ Ok(values[0])
} else {
Err(AlertError::CustomError(
- "Query result is not a number".to_string(),
+ "Query result is not a number or response is empty".to_string(),
))
}
}
-fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> bool {
+pub fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> bool {
match operator {
AlertOperator::GreaterThan => actual > expected,
AlertOperator::LessThan => actual < expected,
@@ -175,31 +161,43 @@ fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> b
}
async fn update_alert_state(
- alert: &AlertConfig,
+ alert: &dyn AlertTrait,
final_res: bool,
actual_value: f64,
) -> Result<(), AlertError> {
+ let guard = ALERTS.write().await;
+ let alerts = if let Some(alerts) = guard.as_ref() {
+ alerts
+ } else {
+ return Err(AlertError::CustomError("No AlertManager set".into()));
+ };
+
if final_res {
let message = format!(
"Alert Triggered: {}\n\nThreshold: ({} {})\nCurrent Value: {}\nEvaluation Window: {} | Frequency: {}\n\nQuery:\n{}",
- alert.id,
- alert.threshold_config.operator,
- alert.threshold_config.value,
+ alert.get_id(),
+ alert.get_threshold_config().operator,
+ alert.get_threshold_config().value,
actual_value,
alert.get_eval_window(),
alert.get_eval_frequency(),
- alert.query
+ alert.get_query()
);
- ALERTS
- .update_state(alert.id, AlertState::Triggered, Some(message))
+
+ alerts
+ .update_state(*alert.get_id(), AlertState::Triggered, Some(message))
.await
- } else if ALERTS.get_state(alert.id).await?.eq(&AlertState::Triggered) {
- ALERTS
- .update_state(alert.id, AlertState::Resolved, Some("".into()))
+ } else if alerts
+ .get_state(*alert.get_id())
+ .await?
+ .eq(&AlertState::Triggered)
+ {
+ alerts
+ .update_state(*alert.get_id(), AlertState::Resolved, Some("".into()))
.await
} else {
- ALERTS
- .update_state(alert.id, AlertState::Resolved, None)
+ alerts
+ .update_state(*alert.get_id(), AlertState::Resolved, None)
.await
}
}
diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs
index 6429f519b..6ce1ff02b 100644
--- a/src/alerts/mod.rs
+++ b/src/alerts/mod.rs
@@ -17,7 +17,7 @@
*/
use actix_web::http::header::ContentType;
-use arrow_schema::{DataType, Schema};
+use arrow_schema::{ArrowError, DataType, Schema};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion::logical_expr::{LogicalPlan, Projection};
@@ -25,11 +25,12 @@ use datafusion::sql::sqlparser::parser::ParserError;
use derive_more::FromStrError;
use derive_more::derive::FromStr;
use http::StatusCode;
-use once_cell::sync::Lazy;
+// use once_cell::sync::Lazy;
use serde::Serialize;
use serde_json::{Error as SerdeError, Value as JsonValue};
use std::collections::HashMap;
-use std::fmt::{self, Display};
+use std::fmt::{self, Debug, Display};
+use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::sync::oneshot::{Receiver, Sender};
@@ -38,12 +39,17 @@ use tokio::task::JoinHandle;
use tracing::{error, trace, warn};
use ulid::Ulid;
+pub mod alert_types;
pub mod alerts_utils;
pub mod target;
+pub mod traits;
+use crate::alerts::alert_types::ThresholdAlert;
use crate::alerts::target::TARGETS;
+use crate::alerts::traits::{AlertManagerTrait, AlertTrait};
use crate::handlers::http::fetch_schema;
use crate::handlers::http::query::create_streams_for_distributed;
+use crate::option::Mode;
use crate::parseable::{PARSEABLE, StreamNotFound};
use crate::query::{QUERY_SESSION, resolve_stream_names};
use crate::rbac::map::SessionKey;
@@ -64,38 +70,68 @@ pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>);
pub const CURRENT_ALERTS_VERSION: &str = "v2";
-pub static ALERTS: Lazy = Lazy::new(|| {
+pub static ALERTS: RwLock