Skip to content

update: add tags, created to alerts #1389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use actix_web::http::header::ContentType;
use arrow_schema::{DataType, Schema};
use async_trait::async_trait;
use chrono::Utc;
use chrono::{DateTime, Utc};
use datafusion::logical_expr::{LogicalPlan, Projection};
use datafusion::sql::sqlparser::parser::ParserError;
use derive_more::FromStrError;
Expand Down Expand Up @@ -197,6 +197,14 @@ pub enum AlertType {
Threshold,
}

impl Display for AlertType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AlertType::Threshold => write!(f, "threshold"),
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub enum AlertOperator {
Expand Down Expand Up @@ -528,6 +536,7 @@ pub struct AlertRequest {
pub threshold_config: ThresholdConfig,
pub eval_config: EvalConfig,
pub targets: Vec<Ulid>,
pub tags: Option<Vec<String>>,
}

impl AlertRequest {
Expand All @@ -536,17 +545,21 @@ impl AlertRequest {
for id in &self.targets {
TARGETS.get_target_by_id(id).await?;
}
let datasets = resolve_stream_names(&self.query)?;
let config = AlertConfig {
version: AlertVerison::from(CURRENT_ALERTS_VERSION),
id: Ulid::new(),
severity: self.severity,
title: self.title,
query: self.query,
datasets,
alert_type: self.alert_type,
threshold_config: self.threshold_config,
eval_config: self.eval_config,
targets: self.targets,
state: AlertState::default(),
created: Utc::now(),
tags: self.tags,
};
Ok(config)
}
Expand All @@ -561,13 +574,16 @@ pub struct AlertConfig {
pub severity: Severity,
pub title: String,
pub query: String,
pub datasets: Vec<String>,
pub alert_type: AlertType,
pub threshold_config: ThresholdConfig,
pub eval_config: EvalConfig,
pub targets: Vec<Ulid>,
// for new alerts, state should be resolved
#[serde(default)]
pub state: AlertState,
pub created: DateTime<Utc>,
pub tags: Option<Vec<String>>,
}

impl AlertConfig {
Expand All @@ -580,6 +596,7 @@ impl AlertConfig {
let alert_info = format!("Alert '{}' (ID: {})", basic_fields.title, basic_fields.id);

let query = Self::build_query_from_v1(alert_json, &alert_info).await?;
let datasets = resolve_stream_names(&query)?;
let threshold_config = Self::extract_threshold_config(alert_json, &alert_info)?;
let eval_config = Self::extract_eval_config(alert_json, &alert_info)?;
let targets = Self::extract_targets(alert_json, &alert_info)?;
Expand All @@ -592,11 +609,14 @@ impl AlertConfig {
severity: basic_fields.severity,
title: basic_fields.title,
query,
datasets,
alert_type: AlertType::Threshold,
threshold_config,
eval_config,
targets,
state,
created: Utc::now(),
tags: None,
};

// Save the migrated alert back to storage
Expand Down Expand Up @@ -1183,6 +1203,65 @@ impl AlertConfig {
}
Ok(())
}

/// create a summary of the dashboard
/// used for listing dashboards
pub fn to_summary(&self) -> serde_json::Map<String, serde_json::Value> {
let mut map = serde_json::Map::new();

map.insert(
"title".to_string(),
serde_json::Value::String(self.title.clone()),
);

map.insert(
"created".to_string(),
serde_json::Value::String(self.created.to_string()),
);

map.insert(
"alertType".to_string(),
serde_json::Value::String(self.alert_type.to_string()),
);

map.insert(
"id".to_string(),
serde_json::Value::String(self.id.to_string()),
);

map.insert(
"severity".to_string(),
serde_json::Value::String(self.severity.to_string()),
);

map.insert(
"state".to_string(),
serde_json::Value::String(self.state.to_string()),
);

if let Some(tags) = &self.tags {
map.insert(
"tags".to_string(),
serde_json::Value::Array(
tags.iter()
.map(|tag| serde_json::Value::String(tag.clone()))
.collect(),
),
);
}

map.insert(
"datasets".to_string(),
serde_json::Value::Array(
self.datasets
.iter()
.map(|dataset| serde_json::Value::String(dataset.clone()))
.collect(),
),
);

map
}
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -1221,6 +1300,8 @@ pub enum AlertError {
ParserError(#[from] ParserError),
#[error("Invalid alert query")]
InvalidAlertQuery,
#[error("Invalid query parameter")]
InvalidQueryParameter,
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -1243,6 +1324,7 @@ impl actix_web::ResponseError for AlertError {
Self::TargetInUse => StatusCode::CONFLICT,
Self::ParserError(_) => StatusCode::BAD_REQUEST,
Self::InvalidAlertQuery => StatusCode::BAD_REQUEST,
Self::InvalidQueryParameter => StatusCode::BAD_REQUEST,
}
}

Expand Down Expand Up @@ -1350,6 +1432,7 @@ impl Alerts {
pub async fn list_alerts_for_user(
&self,
session: SessionKey,
tags: Vec<String>,
) -> Result<Vec<AlertConfig>, AlertError> {
let mut alerts: Vec<AlertConfig> = Vec::new();
for (_, alert) in self.alerts.read().await.iter() {
Expand All @@ -1358,6 +1441,17 @@ impl Alerts {
alerts.push(alert.to_owned());
}
}
if tags.is_empty() {
return Ok(alerts);
}
// filter alerts based on tags
alerts.retain(|alert| {
if let Some(alert_tags) = &alert.tags {
alert_tags.iter().any(|tag| tags.contains(tag))
} else {
false
}
});

Ok(alerts)
}
Expand Down Expand Up @@ -1456,6 +1550,20 @@ impl Alerts {

Ok(())
}

/// List tags from all alerts
/// This function returns a list of unique tags from all alerts
pub async fn list_tags(&self) -> Vec<String> {
let alerts = self.alerts.read().await;
let mut tags = alerts
.iter()
.filter_map(|(_, alert)| alert.tags.as_ref())
.flat_map(|t| t.iter().cloned())
.collect::<Vec<String>>();
tags.sort();
tags.dedup();
tags
}
}

#[derive(Debug, Serialize)]
Expand Down
30 changes: 27 additions & 3 deletions src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

use std::str::FromStr;
use std::{collections::HashMap, str::FromStr};

use crate::{
parseable::PARSEABLE,
Expand All @@ -38,9 +38,28 @@ use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState};
/// Read all alerts then return alerts which satisfy the condition
pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> {
let session_key = extract_session_key_from_req(&req)?;
let alerts = ALERTS.list_alerts_for_user(session_key).await?;
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
.map_err(|_| AlertError::InvalidQueryParameter)?;
let mut tags_list = Vec::new();
if !query_map.is_empty() {
if let Some(tags) = query_map.get("tags") {
tags_list = tags
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
if tags_list.is_empty() {
return Err(AlertError::InvalidQueryParameter);
}
}
}

Ok(web::Json(alerts))
let alerts = ALERTS.list_alerts_for_user(session_key, tags_list).await?;
let alerts_summary = alerts
.iter()
.map(|alert| alert.to_summary())
.collect::<Vec<_>>();
Ok(web::Json(alerts_summary))
}

// POST /alerts
Expand Down Expand Up @@ -154,3 +173,8 @@ pub async fn update_state(
let alert = ALERTS.get_alert_by_id(alert_id).await?;
Ok(web::Json(alert))
}

pub async fn list_tags() -> Result<impl Responder, AlertError> {
let tags = ALERTS.list_tags().await;
Ok(web::Json(tags))
}
7 changes: 7 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ impl Server {
.authorize(Action::DeleteAlert),
),
)
.service(
web::resource("/list_tags").route(
web::get()
.to(alerts::list_tags)
.authorize(Action::ListDashboard),
),
)
}

pub fn get_targets_webscope() -> Scope {
Expand Down
2 changes: 1 addition & 1 deletion src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ async fn get_alert_titles(
query_value: &str,
) -> Result<Vec<Resource>, PrismHomeError> {
let alerts = ALERTS
.list_alerts_for_user(key.clone())
.list_alerts_for_user(key.clone(), vec![])
.await?
.iter()
.filter_map(|alert| {
Expand Down
Loading