Skip to content

V66 cp throughput disk kernel break #9901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ enum_dispatch = "0.3.7"
envmnt = "0.10.4"
flate2 = "1.0"
flexi_logger = "0.29"
fs2 = "0.4"
futures = "~0.3"
grpc = { path = "plugins/grpc" }
hex = "0.4.3"
Expand Down
1 change: 1 addition & 0 deletions agent/crates/public/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ num_enum = "0.5.6"
parking_lot = "0.11"
pnet = "^0.29"
prost.workspace = true
rand = "0.8.5"
regex.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.72"
Expand Down
2 changes: 2 additions & 0 deletions agent/crates/public/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ pub mod pwd;
pub mod queue;
pub mod rpc;
pub mod sender;
pub mod throttle;
pub mod utils;

#[cfg(target_os = "linux")]
pub mod netns;

pub use leaky_bucket::LeakyBucket;
pub use throttle::Throttle;
12 changes: 12 additions & 0 deletions agent/crates/public/src/queue/overwrite_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct OverwriteQueue<T: Sized> {

counter: Counter,

total_overwritten_count: AtomicU64,

_marker: PhantomData<T>,
}

Expand All @@ -78,6 +80,7 @@ impl<T> OverwriteQueue<T> {
notify: Condvar::new(),
terminated: AtomicBool::new(false),
counter: Counter::default(),
total_overwritten_count: AtomicU64::new(0),
_marker: PhantomData,
}
}
Expand Down Expand Up @@ -130,6 +133,8 @@ impl<T> OverwriteQueue<T> {
self.counter
.overwritten
.fetch_add(to_overwrite as u64, Ordering::Relaxed);
self.total_overwritten_count
.fetch_add(to_overwrite as u64, Ordering::Relaxed);
}
}
let free_after_end = self.size - (raw_end & (self.size - 1));
Expand Down Expand Up @@ -416,6 +421,13 @@ impl<T> Receiver<T> {
}
}
}

pub fn total_overwritten_count(&self) -> u64 {
self.counter()
.queue
.total_overwritten_count
.load(Ordering::Relaxed)
}
}

impl<T> Drop for Receiver<T> {
Expand Down
82 changes: 82 additions & 0 deletions agent/crates/public/src/throttle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2025 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::fmt::Debug;

use log::warn;
use rand::prelude::{Rng, SeedableRng, SmallRng};

use crate::queue::DebugSender;
use crate::LeakyBucket;

const BUFFER_SIZE: usize = 1024;
pub struct Throttle<T: Debug> {
leaky_bucket: LeakyBucket,
period_count: u32,
buffer: Vec<T>,
output_queue: DebugSender<T>,
small_rng: SmallRng,
}

impl<T: Debug> Throttle<T> {
pub fn new(rate: u64, output_queue: DebugSender<T>) -> Self {
Throttle {
leaky_bucket: LeakyBucket::new(Some(rate)),
buffer: Vec::with_capacity(BUFFER_SIZE),
output_queue,
small_rng: SmallRng::from_entropy(),
period_count: 0,
}
}

// return false, indicates that the throttle has been reached
// and the item or cached items will be discarded
pub fn send(&mut self, item: T) -> bool {
if self.buffer.len() > BUFFER_SIZE {
self.flush();
self.period_count = 0;
}

self.period_count += 1;
if self.leaky_bucket.acquire(1) {
self.buffer.push(item);
} else {
let index = self.small_rng.gen_range(0..self.period_count) as usize;
if index < self.buffer.len() {
self.buffer[index] = item;
}
return false;
}
true
}

pub fn flush(&mut self) {
if !self.buffer.is_empty() {
if let Err(e) = self.output_queue.send_all(&mut self.buffer) {
warn!(
"throttle push {} items to queue failed, because {:?}",
self.buffer.len(),
e
);
self.buffer.clear();
}
}
}

pub fn set_rate(&mut self, rate: u64) {
self.leaky_bucket.set_rate(Some(rate));
}
}
62 changes: 61 additions & 1 deletion agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2079,12 +2079,35 @@ impl Default for TxThroughput {
}
}

#[derive(Clone, Copy, Default, Debug, Deserialize, PartialEq, Eq)]
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct FreeDisk {
pub percentage_trigger_threshold: u8,
#[serde(deserialize_with = "deser_u64_with_giga_unit")]
pub absolute_trigger_threshold: u64,
pub directories: Vec<String>,
}

impl Default for FreeDisk {
fn default() -> Self {
Self {
percentage_trigger_threshold: 15,
absolute_trigger_threshold: 10 << 30,
#[cfg(not(target_os = "windows"))]
directories: vec!["/".to_string()],
#[cfg(target_os = "windows")]
directories: vec!["c:\\".to_string()],
}
}
}

#[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct CircuitBreakers {
pub sys_memory_percentage: SysMemoryPercentage,
pub relative_sys_load: RelativeSysLoad,
pub tx_throughput: TxThroughput,
pub free_disk: FreeDisk,
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -2132,6 +2155,33 @@ impl Default for Ntp {
}
}

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[repr(u8)]
pub enum TrafficOverflowAction {
#[default]
Waiting = 0,
Dropping = 1,
}

fn to_traffic_overflow_action<'de: 'a, 'a, D>(
deserializer: D,
) -> Result<TrafficOverflowAction, D::Error>
where
D: Deserializer<'de>,
{
match <&'a str>::deserialize(deserializer)?
.to_uppercase()
.as_str()
{
"WAIT" => Ok(TrafficOverflowAction::Waiting),
"DROP" => Ok(TrafficOverflowAction::Dropping),
other => Err(de::Error::invalid_value(
Unexpected::Str(other),
&"WAIT|DROP",
)),
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct Communication {
Expand All @@ -2144,6 +2194,8 @@ pub struct Communication {
#[serde(deserialize_with = "deser_usize_with_mega_unit")]
pub grpc_buffer_size: usize,
pub max_throughput_to_ingester: u64,
#[serde(deserialize_with = "to_traffic_overflow_action")]
pub ingester_traffic_overflow_action: TrafficOverflowAction,
pub request_via_nat_ip: bool,
pub proxy_controller_ip: String,
pub proxy_controller_port: u16,
Expand All @@ -2160,6 +2212,7 @@ impl Default for Communication {
ingester_port: 30033,
grpc_buffer_size: 5 << 20,
max_throughput_to_ingester: 100,
ingester_traffic_overflow_action: TrafficOverflowAction::Waiting,
request_via_nat_ip: false,
}
}
Expand Down Expand Up @@ -3299,6 +3352,13 @@ where
Ok(usize::deserialize(deserializer)? << 20)
}

fn deser_u64_with_giga_unit<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
Ok(u64::deserialize(deserializer)? << 30)
}

// `humantime` will not parse "0" as Duration::ZERO
// If "0" is a valid option for a duration field, use this deserializer
// #[serde(deserialize_with = "deser_humantime_with_zero")]
Expand Down
65 changes: 64 additions & 1 deletion agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use super::{
HttpEndpointMatchRule, OracleConfig, PcapStream, PortConfig, ProcessorsFlowLogTunning,
RequestLogTunning, SessionTimeout, TagFilterOperator, UserConfig,
},
ConfigError, KubernetesPollerType,
ConfigError, KubernetesPollerType, TrafficOverflowAction,
};
use crate::flow_generator::protocol_logs::decode_new_rpc_trace_context_with_type;
use crate::rpc::Session;
Expand Down Expand Up @@ -218,6 +218,9 @@ pub struct EnvironmentConfig {
pub system_load_circuit_breaker_recover: f32,
pub system_load_circuit_breaker_metric: agent::SystemLoadMetric,
pub page_cache_reclaim_percentage: u8,
pub free_disk_circuit_breaker_percentage_threshold: u8,
pub free_disk_circuit_breaker_absolute_threshold: u64,
pub free_disk_circuit_breaker_directories: Vec<String>,
}

#[derive(Clone, PartialEq, Eq, Debug)]
Expand All @@ -237,6 +240,7 @@ pub struct SenderConfig {
pub npb_socket_type: agent::SocketType,
pub multiple_sockets_to_ingester: bool,
pub max_throughput_to_ingester: u64, // unit: Mbps
pub ingester_traffic_overflow_action: TrafficOverflowAction,
pub collector_socket_type: agent::SocketType,
pub standalone_data_file_size: u64,
pub standalone_data_file_dir: String,
Expand Down Expand Up @@ -1664,6 +1668,22 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
.relative_sys_load
.metric,
page_cache_reclaim_percentage: conf.global.tunning.page_cache_reclaim_percentage,
free_disk_circuit_breaker_percentage_threshold: conf
.global
.circuit_breakers
.free_disk
.percentage_trigger_threshold,
free_disk_circuit_breaker_absolute_threshold: conf
.global
.circuit_breakers
.free_disk
.absolute_trigger_threshold,
free_disk_circuit_breaker_directories: {
let mut v = conf.global.circuit_breakers.free_disk.directories.clone();
v.sort();
v.dedup();
v
},
},
synchronizer: SynchronizerConfig {
sync_interval: conf.global.communication.proactive_request_interval,
Expand Down Expand Up @@ -1756,6 +1776,10 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
.throughput_monitoring_interval,
multiple_sockets_to_ingester: conf.outputs.socket.multiple_sockets_to_ingester,
max_throughput_to_ingester: conf.global.communication.max_throughput_to_ingester,
ingester_traffic_overflow_action: conf
.global
.communication
.ingester_traffic_overflow_action,
collector_socket_type: conf.outputs.socket.data_socket_type,
standalone_data_file_size: conf.global.standalone_mode.max_data_file_size,
standalone_data_file_dir: conf.global.standalone_mode.data_file_dir.clone(),
Expand Down Expand Up @@ -3902,6 +3926,17 @@ impl ConfigHandler {
);
communication.max_throughput_to_ingester = new_communication.max_throughput_to_ingester;
}
if communication.ingester_traffic_overflow_action
!= new_communication.ingester_traffic_overflow_action
{
info!(
"Update global.communication.ingester_traffic_overflow_action from {:?} to {:?}.",
communication.ingester_traffic_overflow_action,
new_communication.ingester_traffic_overflow_action
);
communication.ingester_traffic_overflow_action =
new_communication.ingester_traffic_overflow_action;
}
if communication.ingester_ip != new_communication.ingester_ip {
info!(
"Update global.communication.ingester_ip from {:?} to {:?}.",
Expand Down Expand Up @@ -4003,6 +4038,34 @@ impl ConfigHandler {
limits.max_sockets_tolerate_interval = new_limits.max_sockets_tolerate_interval;
}

let free_disk = &mut config.global.circuit_breakers.free_disk;
let new_free_disk = &mut new_config.user_config.global.circuit_breakers.free_disk;
if free_disk.percentage_trigger_threshold != new_free_disk.percentage_trigger_threshold {
info!(
"Update global.circuit_breakers.free_disk.percentage_trigger_threshold from {:?} to {:?}.",
free_disk.percentage_trigger_threshold, new_free_disk.percentage_trigger_threshold
);
free_disk.percentage_trigger_threshold = new_free_disk.percentage_trigger_threshold;
}

if free_disk.absolute_trigger_threshold != new_free_disk.absolute_trigger_threshold {
info!(
"Update global.circuit_breakers.free_disk.absolute_trigger_threshold from {:?} to {:?}.",
free_disk.absolute_trigger_threshold, new_free_disk.absolute_trigger_threshold
);
free_disk.absolute_trigger_threshold = new_free_disk.absolute_trigger_threshold;
}

new_free_disk.directories.sort();
new_free_disk.directories.dedup();
if free_disk.directories != new_free_disk.directories {
info!(
"Update global.circuit_breakers.free_disk.directories from {:?} to {:?}.",
free_disk.directories, new_free_disk.directories
);
free_disk.directories = new_free_disk.directories.clone();
}

let ntp = &mut config.global.ntp;
let new_ntp = &mut new_config.user_config.global.ntp;
if ntp.enabled != new_ntp.enabled {
Expand Down
Loading
Loading