Skip to content

Commit ba215c4

Browse files
committed
bugfix & cargo clippy
Fixed bug in `get_filter_string()` where the columns and operators were being incorrectly parsed into the SQL string
1 parent 51d8fcd commit ba215c4

File tree

6 files changed

+88
-77
lines changed

6 files changed

+88
-77
lines changed

src/alerts/alerts_utils.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -359,21 +359,28 @@ pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
359359
return Err("value must be null when operator is either `is null` or `is not null`"
360360
.into());
361361
}
362-
let value = NumberOrString::from_string(
363-
condition.value.as_ref().unwrap().to_owned(),
364-
);
365-
match value {
366-
NumberOrString::Number(val) => exprs.push(format!(
367-
"\"{}\" {} {}",
368-
condition.column, condition.operator, val
369-
)),
370-
NumberOrString::String(val) => exprs.push(format!(
371-
"\"{}\" {} '{}'",
372-
condition.column,
373-
condition.operator,
374-
val.replace('\'', "''")
375-
)),
376-
}
362+
363+
let value = condition.value.as_ref().unwrap();
364+
365+
let operator_and_value = match condition.operator {
366+
WhereConfigOperator::Contains => format!("LIKE '%{value}%'"),
367+
WhereConfigOperator::DoesNotContain => format!("NOT LIKE '%{value}%'"),
368+
WhereConfigOperator::ILike => format!("ILIKE '%{value}%'"),
369+
WhereConfigOperator::BeginsWith => format!("LIKE '{value}%'"),
370+
WhereConfigOperator::DoesNotBeginWith => format!("NOT LIKE '{value}%'"),
371+
WhereConfigOperator::EndsWith => format!("LIKE '%{value}'"),
372+
WhereConfigOperator::DoesNotEndWith => format!("NOT LIKE '%{value}'"),
373+
_ => {
374+
let value = match NumberOrString::from_string(value.to_owned()) {
375+
NumberOrString::Number(val) => format!("{val}"),
376+
NumberOrString::String(val) => {
377+
format!("'{}'", val.replace('\'', "''"))
378+
}
379+
};
380+
format!("{} {}", condition.operator, value)
381+
}
382+
};
383+
exprs.push(format!("\"{}\" {}", condition.column, operator_and_value))
377384
} else {
378385
exprs.push(format!("\"{}\" {}", condition.column, condition.operator))
379386
}

src/handlers/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ pub mod cluster;
3636
pub mod correlation;
3737
pub mod health_check;
3838
pub mod ingest;
39-
pub mod resource_check;
4039
mod kinesis;
4140
pub mod llm;
4241
pub mod logstream;
@@ -47,6 +46,7 @@ pub mod prism_home;
4746
pub mod prism_logstream;
4847
pub mod query;
4948
pub mod rbac;
49+
pub mod resource_check;
5050
pub mod role;
5151
pub mod users;
5252
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;

src/handlers/http/modal/ingest_server.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
use std::sync::Arc;
2020
use std::thread;
2121

22+
use actix_web::middleware::from_fn;
2223
use actix_web::web;
2324
use actix_web::Scope;
24-
use actix_web::middleware::from_fn;
2525
use actix_web_prometheus::PrometheusMetrics;
2626
use async_trait::async_trait;
2727
use base64::Engine;
@@ -68,10 +68,9 @@ impl ParseableServer for IngestServer {
6868
.service(
6969
// Base path "{url}/api/v1"
7070
web::scope(&base_path())
71-
.service(
72-
Server::get_ingest_factory()
73-
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
74-
)
71+
.service(Server::get_ingest_factory().wrap(from_fn(
72+
resource_check::check_resource_utilization_middleware,
73+
)))
7574
.service(Self::logstream_api())
7675
.service(Server::get_about_factory())
7776
.service(Self::analytics_factory())
@@ -81,10 +80,9 @@ impl ParseableServer for IngestServer {
8180
.service(Server::get_metrics_webscope())
8281
.service(Server::get_readiness_factory()),
8382
)
84-
.service(
85-
Server::get_ingest_otel_factory()
86-
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
87-
);
83+
.service(Server::get_ingest_otel_factory().wrap(from_fn(
84+
resource_check::check_resource_utilization_middleware,
85+
)));
8886
}
8987

9088
async fn load_metadata(&self) -> anyhow::Result<Option<Bytes>> {
@@ -231,7 +229,9 @@ impl IngestServer {
231229
.to(ingest::post_event)
232230
.authorize_for_stream(Action::Ingest),
233231
)
234-
.wrap(from_fn(resource_check::check_resource_utilization_middleware)),
232+
.wrap(from_fn(
233+
resource_check::check_resource_utilization_middleware,
234+
)),
235235
)
236236
.service(
237237
web::resource("/sync")

src/handlers/http/modal/query_server.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,9 @@ impl ParseableServer for QueryServer {
5454
.service(
5555
web::scope(&base_path())
5656
.service(Server::get_correlation_webscope())
57-
.service(
58-
Server::get_query_factory()
59-
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
60-
)
57+
.service(Server::get_query_factory().wrap(from_fn(
58+
resource_check::check_resource_utilization_middleware,
59+
)))
6160
.service(Server::get_liveness_factory())
6261
.service(Server::get_readiness_factory())
6362
.service(Server::get_about_factory())
@@ -70,10 +69,9 @@ impl ParseableServer for QueryServer {
7069
.service(Server::get_oauth_webscope(oidc_client))
7170
.service(Self::get_user_role_webscope())
7271
.service(Server::get_roles_webscope())
73-
.service(
74-
Server::get_counts_webscope()
75-
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
76-
)
72+
.service(Server::get_counts_webscope().wrap(from_fn(
73+
resource_check::check_resource_utilization_middleware,
74+
)))
7775
.service(Server::get_metrics_webscope())
7876
.service(Server::get_alerts_webscope())
7977
.service(Self::get_cluster_web_scope()),

src/handlers/http/modal/server.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use crate::handlers::http::alerts;
2525
use crate::handlers::http::base_path;
2626
use crate::handlers::http::health_check;
2727
use crate::handlers::http::prism_base_path;
28-
use crate::handlers::http::resource_check;
2928
use crate::handlers::http::query;
29+
use crate::handlers::http::resource_check;
3030
use crate::handlers::http::users::dashboards;
3131
use crate::handlers::http::users::filters;
3232
use crate::hottier::HotTierManager;
@@ -36,9 +36,9 @@ use crate::storage;
3636
use crate::sync;
3737
use crate::sync::sync_start;
3838

39+
use actix_web::middleware::from_fn;
3940
use actix_web::web;
4041
use actix_web::web::resource;
41-
use actix_web::middleware::from_fn;
4242
use actix_web::Resource;
4343
use actix_web::Scope;
4444
use actix_web_prometheus::PrometheusMetrics;
@@ -73,14 +73,12 @@ impl ParseableServer for Server {
7373
.service(
7474
web::scope(&base_path())
7575
.service(Self::get_correlation_webscope())
76-
.service(
77-
Self::get_query_factory()
78-
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
79-
)
80-
.service(
81-
Self::get_ingest_factory()
82-
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
83-
)
76+
.service(Self::get_query_factory().wrap(from_fn(
77+
resource_check::check_resource_utilization_middleware,
78+
)))
79+
.service(Self::get_ingest_factory().wrap(from_fn(
80+
resource_check::check_resource_utilization_middleware,
81+
)))
8482
.service(Self::get_liveness_factory())
8583
.service(Self::get_readiness_factory())
8684
.service(Self::get_about_factory())
@@ -93,10 +91,9 @@ impl ParseableServer for Server {
9391
.service(Self::get_oauth_webscope(oidc_client))
9492
.service(Self::get_user_role_webscope())
9593
.service(Self::get_roles_webscope())
96-
.service(
97-
Self::get_counts_webscope()
98-
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
99-
)
94+
.service(Self::get_counts_webscope().wrap(from_fn(
95+
resource_check::check_resource_utilization_middleware,
96+
)))
10097
.service(Self::get_alerts_webscope())
10198
.service(Self::get_metrics_webscope()),
10299
)
@@ -106,10 +103,9 @@ impl ParseableServer for Server {
106103
.service(Server::get_prism_logstream())
107104
.service(Server::get_prism_datasets()),
108105
)
109-
.service(
110-
Self::get_ingest_otel_factory()
111-
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
112-
)
106+
.service(Self::get_ingest_otel_factory().wrap(from_fn(
107+
resource_check::check_resource_utilization_middleware,
108+
)))
113109
.service(Self::get_generated());
114110
}
115111

@@ -367,14 +363,16 @@ impl Server {
367363
.route(
368364
web::put()
369365
.to(logstream::put_stream)
370-
.authorize_for_stream(Action::CreateStream)
366+
.authorize_for_stream(Action::CreateStream),
371367
)
372368
// POST "/logstream/{logstream}" ==> Post logs to given log stream
373369
.route(
374370
web::post()
375371
.to(ingest::post_event)
376372
.authorize_for_stream(Action::Ingest)
377-
.wrap(from_fn(resource_check::check_resource_utilization_middleware)),
373+
.wrap(from_fn(
374+
resource_check::check_resource_utilization_middleware,
375+
)),
378376
)
379377
// DELETE "/logstream/{logstream}" ==> Delete log stream
380378
.route(
@@ -383,7 +381,7 @@ impl Server {
383381
.authorize_for_stream(Action::DeleteStream),
384382
)
385383
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
386-
)
384+
)
387385
.service(
388386
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
389387
web::resource("/info").route(

src/handlers/http/resource_check.rs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,37 @@ use actix_web::{
2525
error::ErrorServiceUnavailable,
2626
middleware::Next,
2727
};
28-
use tokio::{select, time::{interval, Duration}};
29-
use tracing::{warn, trace, info};
28+
use tokio::{
29+
select,
30+
time::{interval, Duration},
31+
};
32+
use tracing::{info, trace, warn};
3033

31-
use crate::analytics::{SYS_INFO, refresh_sys_info};
34+
use crate::analytics::{refresh_sys_info, SYS_INFO};
3235
use crate::parseable::PARSEABLE;
3336

34-
static RESOURCE_CHECK_ENABLED:LazyLock<Arc<AtomicBool>> = LazyLock::new(|| Arc::new(AtomicBool::new(false)));
37+
static RESOURCE_CHECK_ENABLED: LazyLock<Arc<AtomicBool>> =
38+
LazyLock::new(|| Arc::new(AtomicBool::new(false)));
3539

3640
/// Spawn a background task to monitor system resources
3741
pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
3842
tokio::spawn(async move {
3943
let resource_check_interval = PARSEABLE.options.resource_check_interval;
4044
let mut check_interval = interval(Duration::from_secs(resource_check_interval));
4145
let mut shutdown_rx = shutdown_rx;
42-
46+
4347
let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold;
4448
let memory_threshold = PARSEABLE.options.memory_utilization_threshold;
45-
46-
info!("Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%",
47-
cpu_threshold, memory_threshold);
49+
50+
info!(
51+
"Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%",
52+
cpu_threshold, memory_threshold
53+
);
4854
loop {
4955
select! {
5056
_ = check_interval.tick() => {
5157
trace!("Checking system resource utilization...");
52-
58+
5359
refresh_sys_info();
5460
let (used_memory, total_memory, cpu_usage) = tokio::task::spawn_blocking(|| {
5561
let sys = SYS_INFO.lock().unwrap();
@@ -58,36 +64,36 @@ pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
5864
let cpu_usage = sys.global_cpu_usage();
5965
(used_memory, total_memory, cpu_usage)
6066
}).await.unwrap();
61-
67+
6268
let mut resource_ok = true;
63-
69+
6470
// Calculate memory usage percentage
6571
let memory_usage = if total_memory > 0.0 {
6672
(used_memory / total_memory) * 100.0
6773
} else {
6874
0.0
6975
};
70-
76+
7177
// Log current resource usage every few checks for debugging
72-
info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)",
73-
cpu_usage, memory_usage,
74-
used_memory / 1024.0 / 1024.0 / 1024.0,
78+
info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)",
79+
cpu_usage, memory_usage,
80+
used_memory / 1024.0 / 1024.0 / 1024.0,
7581
total_memory / 1024.0 / 1024.0 / 1024.0);
76-
82+
7783
// Check memory utilization
7884
if memory_usage > memory_threshold {
79-
warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)",
85+
warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)",
8086
memory_usage, memory_threshold);
8187
resource_ok = false;
8288
}
83-
89+
8490
// Check CPU utilization
8591
if cpu_usage > cpu_threshold {
86-
warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)",
92+
warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)",
8793
cpu_usage, cpu_threshold);
8894
resource_ok = false;
8995
}
90-
96+
9197
let previous_state = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst);
9298
RESOURCE_CHECK_ENABLED.store(resource_ok, std::sync::atomic::Ordering::SeqCst);
9399

@@ -115,12 +121,14 @@ pub async fn check_resource_utilization_middleware(
115121
req: ServiceRequest,
116122
next: Next<impl MessageBody>,
117123
) -> Result<ServiceResponse<impl MessageBody>, Error> {
118-
119124
let resource_ok = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst);
120125

121126
if !resource_ok {
122127
let error_msg = "Server resources over-utilized";
123-
warn!("Rejecting request to {} due to resource constraints", req.path());
128+
warn!(
129+
"Rejecting request to {} due to resource constraints",
130+
req.path()
131+
);
124132
return Err(ErrorServiceUnavailable(error_msg));
125133
}
126134

0 commit comments

Comments
 (0)