Skip to content

Commit cd8c029

Browse files
committed
Added resource check interval option and updated the resource monitoring interval to be taken from the cli via --resource-check-interval flag. Also wrapped the middleware around cpu intensive query endpoints available in query.rs.
1 parent f90714d commit cd8c029

File tree

5 files changed

+42
-6
lines changed

5 files changed

+42
-6
lines changed

src/cli.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,15 @@ pub struct Options {
318318
pub parquet_compression: Compression,
319319

320320
// Resource monitoring
321+
#[arg(
322+
long,
323+
env = "P_RESOURCE_CHECK_INTERVAL",
324+
default_value = "30",
325+
value_parser = validation::validate_seconds,
326+
help = "Resource monitoring check interval in seconds"
327+
)]
328+
pub resource_check_interval: u64,
329+
321330
#[arg(
322331
long,
323332
env = "P_CPU_THRESHOLD",

src/handlers/http/modal/query_server.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ use std::thread;
2222
use crate::handlers::airplane;
2323
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
2424
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
25-
use crate::handlers::http::{base_path, prism_base_path};
25+
use crate::handlers::http::{base_path, prism_base_path, resource_check};
2626
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2727
use crate::handlers::http::{rbac, role};
2828
use crate::hottier::HotTierManager;
2929
use crate::rbac::role::Action;
3030
use crate::sync::sync_start;
3131
use crate::{analytics, migration, storage, sync};
32+
use actix_web::middleware::from_fn;
3233
use actix_web::web::{resource, ServiceConfig};
3334
use actix_web::{web, Scope};
3435
use actix_web_prometheus::PrometheusMetrics;
@@ -53,7 +54,10 @@ impl ParseableServer for QueryServer {
5354
.service(
5455
web::scope(&base_path())
5556
.service(Server::get_correlation_webscope())
56-
.service(Server::get_query_factory())
57+
.service(
58+
Server::get_query_factory()
59+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
60+
)
5761
.service(Server::get_liveness_factory())
5862
.service(Server::get_readiness_factory())
5963
.service(Server::get_about_factory())
@@ -66,7 +70,10 @@ impl ParseableServer for QueryServer {
6670
.service(Server::get_oauth_webscope(oidc_client))
6771
.service(Self::get_user_role_webscope())
6872
.service(Server::get_roles_webscope())
69-
.service(Server::get_counts_webscope())
73+
.service(
74+
Server::get_counts_webscope()
75+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
76+
)
7077
.service(Server::get_metrics_webscope())
7178
.service(Server::get_alerts_webscope())
7279
.service(Self::get_cluster_web_scope()),
@@ -143,13 +150,19 @@ impl ParseableServer for QueryServer {
143150
let (cancel_tx, cancel_rx) = oneshot::channel();
144151
thread::spawn(|| sync::handler(cancel_rx));
145152

153+
// Start resource monitor
154+
let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel();
155+
resource_check::spawn_resource_monitor(resource_shutdown_rx);
156+
146157
tokio::spawn(airplane::server());
147158

148159
let result = self
149160
.start(shutdown_rx, prometheus.clone(), PARSEABLE.options.openid())
150161
.await?;
151162
// Cancel sync jobs
152163
cancel_tx.send(()).expect("Cancellation should not fail");
164+
// Shutdown resource monitor
165+
let _ = resource_shutdown_tx.send(());
153166
if let Err(join_err) = startup_sync_handle.await {
154167
tracing::warn!("startup sync task panicked: {join_err}");
155168
}

src/handlers/http/modal/server.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ impl ParseableServer for Server {
7373
.service(
7474
web::scope(&base_path())
7575
.service(Self::get_correlation_webscope())
76-
.service(Self::get_query_factory())
76+
.service(
77+
Self::get_query_factory()
78+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
79+
)
7780
.service(
7881
Self::get_ingest_factory()
7982
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
@@ -90,7 +93,10 @@ impl ParseableServer for Server {
9093
.service(Self::get_oauth_webscope(oidc_client))
9194
.service(Self::get_user_role_webscope())
9295
.service(Self::get_roles_webscope())
93-
.service(Self::get_counts_webscope())
96+
.service(
97+
Self::get_counts_webscope()
98+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
99+
)
94100
.service(Self::get_alerts_webscope())
95101
.service(Self::get_metrics_webscope()),
96102
)

src/handlers/http/resource_check.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ static RESOURCE_CHECK_ENABLED:LazyLock<Arc<AtomicBool>> = LazyLock::new(|| Arc::
3636
/// Spawn a background task to monitor system resources
3737
pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
3838
tokio::spawn(async move {
39-
let mut check_interval = interval(Duration::from_secs(30));
39+
let resource_check_interval = PARSEABLE.options.resource_check_interval;
40+
let mut check_interval = interval(Duration::from_secs(resource_check_interval));
4041
let mut shutdown_rx = shutdown_rx;
4142

4243
let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold;

src/option.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,13 @@ pub mod validation {
187187
}
188188
}
189189

190+
pub fn validate_seconds(s: &str) -> Result<u64, String> {
191+
if let Ok(seconds) = s.parse::<u64>() {
192+
Ok(seconds)
193+
} else {
194+
Err("Invalid value for seconds. It should be a positive integer".to_string())
195+
}
196+
}
190197
pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> {
191198
if let Ok(size) = s.parse::<usize>() {
192199
if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) {

0 commit comments

Comments
 (0)