Skip to content

feat: add resource utilisation middleware to monitor CPU and memory. #1352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 21, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c3bdb12
feat: add resource utilization middleware to monitor CPU and memory u…
vkhinvasara Jun 17, 2025
4fb2da7
refactor: changed thresholds to 90%
vkhinvasara Jun 17, 2025
5ce1ab9
added resource monitoring with configurable CPU and memory thresholds
vkhinvasara Jun 18, 2025
b58212f
fix: use blocking task for resource usage retrieval and update memory…
vkhinvasara Jun 18, 2025
f6a2517
fix: update memory unit logging from GiB to GB for consistency
vkhinvasara Jun 18, 2025
891e881
refactor: replace RwLock with LazyLock and AtomicBool for speeeedddd
vkhinvasara Jun 18, 2025
f90714d
fix: add resource utilization middleware to ONLY the ingest routes
vkhinvasara Jun 18, 2025
cd8c029
Added resource check interval option and updated the resource monitor…
vkhinvasara Jun 19, 2025
4578d0c
refactor: changed the default interval to 15 seconds.
vkhinvasara Jun 19, 2025
53a4532
refactor: remove resource monitor initialization from ingest, query, …
vkhinvasara Jun 19, 2025
0279f36
Merge branch 'main' into main
nikhilsinhaparseable Jun 19, 2025
88b4596
refactor: add resource utilization middleware to logstream routes, de…
vkhinvasara Jun 19, 2025
5b142b7
refactor: removed resource_check from the PUT stream.
vkhinvasara Jun 19, 2025
5f0fa34
Merge branch 'main' into main
nikhilsinhaparseable Jun 20, 2025
da3583a
refactor: simplify uptime retrieval in Report::new method
vkhinvasara Jun 20, 2025
305ad25
refactor: glazing clippy
vkhinvasara Jun 20, 2025
1469507
empty push
vkhinvasara Jun 20, 2025
b083724
nit: checking if the build runs
vkhinvasara Jun 20, 2025
63e7a58
version change to not use cache for this build
vkhinvasara Jun 20, 2025
3262815
Merge branch 'main' into main
nikhilsinhaparseable Jun 21, 2025
4f1ae5c
fix: update cache key format in build workflow
vkhinvasara Jun 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,25 @@ pub struct Options {
)]
pub parquet_compression: Compression,

// Resource monitoring
#[arg(
long,
env = "P_CPU_THRESHOLD",
default_value = "80.0",
value_parser = validation::validate_percentage,
help = "CPU utilization threshold percentage (0.0-100.0) for resource monitoring"
)]
pub cpu_utilization_threshold: f32,

#[arg(
long,
env = "P_MEMORY_THRESHOLD",
default_value = "80.0",
value_parser = validation::validate_percentage,
help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring"
)]
pub memory_utilization_threshold: f32,

// Integration features
#[arg(
long,
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub mod cluster;
pub mod correlation;
pub mod health_check;
pub mod ingest;
pub mod resource_check;
mod kinesis;
pub mod llm;
pub mod logstream;
Expand Down
22 changes: 18 additions & 4 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::thread;

use actix_web::web;
use actix_web::Scope;
use actix_web::middleware::from_fn;
use actix_web_prometheus::PrometheusMetrics;
use async_trait::async_trait;
use base64::Engine;
Expand All @@ -39,7 +40,7 @@ use crate::{
http::{
base_path, ingest, logstream,
middleware::{DisAllowRootUser, RouteExt},
role,
resource_check, role,
},
},
migration,
Expand Down Expand Up @@ -67,7 +68,10 @@ impl ParseableServer for IngestServer {
.service(
// Base path "{url}/api/v1"
web::scope(&base_path())
.service(Server::get_ingest_factory())
.service(
Server::get_ingest_factory()
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
)
.service(Self::logstream_api())
.service(Server::get_about_factory())
.service(Self::analytics_factory())
Expand All @@ -77,7 +81,10 @@ impl ParseableServer for IngestServer {
.service(Server::get_metrics_webscope())
.service(Server::get_readiness_factory()),
)
.service(Server::get_ingest_otel_factory());
.service(
Server::get_ingest_otel_factory()
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
);
}

async fn load_metadata(&self) -> anyhow::Result<Option<Bytes>> {
Expand Down Expand Up @@ -126,12 +133,18 @@ impl ParseableServer for IngestServer {
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

// Start resource monitor
let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel();
resource_check::spawn_resource_monitor(resource_shutdown_rx);

tokio::spawn(airplane::server());

// Ingestors shouldn't have to deal with OpenId auth flow
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");
// Shutdown resource monitor
let _ = resource_shutdown_tx.send(());
if let Err(join_err) = startup_sync_handle.await {
tracing::warn!("startup sync task panicked: {join_err}");
}
Expand Down Expand Up @@ -223,7 +236,8 @@ impl IngestServer {
web::post()
.to(ingest::post_event)
.authorize_for_stream(Action::Ingest),
),
)
.wrap(from_fn(resource_check::check_resource_utilization_middleware)),
)
.service(
web::resource("/sync")
Expand Down
21 changes: 18 additions & 3 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::handlers::http::alerts;
use crate::handlers::http::base_path;
use crate::handlers::http::health_check;
use crate::handlers::http::prism_base_path;
use crate::handlers::http::resource_check;
use crate::handlers::http::query;
use crate::handlers::http::users::dashboards;
use crate::handlers::http::users::filters;
Expand All @@ -37,6 +38,7 @@ use crate::sync::sync_start;

use actix_web::web;
use actix_web::web::resource;
use actix_web::middleware::from_fn;
use actix_web::Resource;
use actix_web::Scope;
use actix_web_prometheus::PrometheusMetrics;
Expand Down Expand Up @@ -72,7 +74,10 @@ impl ParseableServer for Server {
web::scope(&base_path())
.service(Self::get_correlation_webscope())
.service(Self::get_query_factory())
.service(Self::get_ingest_factory())
.service(
Self::get_ingest_factory()
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
)
.service(Self::get_liveness_factory())
.service(Self::get_readiness_factory())
.service(Self::get_about_factory())
Expand All @@ -95,7 +100,10 @@ impl ParseableServer for Server {
.service(Server::get_prism_logstream())
.service(Server::get_prism_datasets()),
)
.service(Self::get_ingest_otel_factory())
.service(
Self::get_ingest_otel_factory()
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
)
.service(Self::get_generated());
}

Expand Down Expand Up @@ -138,6 +146,10 @@ impl ParseableServer for Server {
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

// Start resource monitor
let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel();
resource_check::spawn_resource_monitor(resource_shutdown_rx);

if PARSEABLE.options.send_analytics {
analytics::init_analytics_scheduler()?;
}
Expand All @@ -150,6 +162,8 @@ impl ParseableServer for Server {
.await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");
// Shutdown resource monitor
let _ = resource_shutdown_tx.send(());
if let Err(join_err) = startup_sync_handle.await {
tracing::warn!("startup sync task panicked: {join_err}");
}
Expand Down Expand Up @@ -367,7 +381,8 @@ impl Server {
.to(logstream::delete)
.authorize_for_stream(Action::DeleteStream),
)
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE))
.wrap(from_fn(resource_check::check_resource_utilization_middleware)),
)
.service(
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
Expand Down
128 changes: 128 additions & 0 deletions src/handlers/http/resource_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::sync::{atomic::AtomicBool, Arc, LazyLock};

use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
error::Error,
error::ErrorServiceUnavailable,
middleware::Next,
};
use tokio::{select, time::{interval, Duration}};
use tracing::{warn, trace, info};

use crate::analytics::{SYS_INFO, refresh_sys_info};
use crate::parseable::PARSEABLE;

static RESOURCE_CHECK_ENABLED:LazyLock<Arc<AtomicBool>> = LazyLock::new(|| Arc::new(AtomicBool::new(false)));

/// Spawn a background task to monitor system resources
pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
tokio::spawn(async move {
let mut check_interval = interval(Duration::from_secs(30));
let mut shutdown_rx = shutdown_rx;

let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold;
let memory_threshold = PARSEABLE.options.memory_utilization_threshold;

info!("Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%",
cpu_threshold, memory_threshold);
loop {
select! {
_ = check_interval.tick() => {
trace!("Checking system resource utilization...");

refresh_sys_info();
let (used_memory, total_memory, cpu_usage) = tokio::task::spawn_blocking(|| {
let sys = SYS_INFO.lock().unwrap();
let used_memory = sys.used_memory() as f32;
let total_memory = sys.total_memory() as f32;
let cpu_usage = sys.global_cpu_usage();
(used_memory, total_memory, cpu_usage)
}).await.unwrap();

let mut resource_ok = true;

// Calculate memory usage percentage
let memory_usage = if total_memory > 0.0 {
(used_memory / total_memory) * 100.0
} else {
0.0
};

// Log current resource usage every few checks for debugging
info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)",
cpu_usage, memory_usage,
used_memory / 1024.0 / 1024.0 / 1024.0,
total_memory / 1024.0 / 1024.0 / 1024.0);

// Check memory utilization
if memory_usage > memory_threshold {
warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)",
memory_usage, memory_threshold);
resource_ok = false;
}

// Check CPU utilization
if cpu_usage > cpu_threshold {
warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)",
cpu_usage, cpu_threshold);
resource_ok = false;
}

let previous_state = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst);
RESOURCE_CHECK_ENABLED.store(resource_ok, std::sync::atomic::Ordering::SeqCst);

// Log state changes
if previous_state != resource_ok {
if resource_ok {
info!("Resource utilization back to normal - requests will be accepted");
} else {
warn!("Resource utilization too high - requests will be rejected");
}
}
},
_ = &mut shutdown_rx => {
trace!("Resource monitor shutting down");
break;
}
}
}
});
}

/// Middleware to check system resource utilization before processing requests
/// Returns 503 Service Unavailable if resources are over-utilized
pub async fn check_resource_utilization_middleware(
req: ServiceRequest,
next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {

let resource_ok = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst);

if !resource_ok {
let error_msg = "Server resources over-utilized";
warn!("Rejecting request to {} due to resource constraints", req.path());
return Err(ErrorServiceUnavailable(error_msg));
}

// Continue processing the request if resource utilization is within limits
next.call(req).await
}
12 changes: 12 additions & 0 deletions src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ pub mod validation {
}
}

pub fn validate_percentage(percentage: &str) -> Result<f32, String> {
if let Ok(percentage) = percentage.parse::<f32>() {
if (0.0..=100.0).contains(&percentage) {
Ok(percentage)
} else {
Err("Invalid percentage value. It should be between 0.0 and 100.0".to_string())
}
} else {
Err("Invalid percentage value. It should be a decimal number like 80.0".to_string())
}
}

pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> {
if let Ok(size) = s.parse::<usize>() {
if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) {
Expand Down
Loading