diff --git a/agent/Cargo.lock b/agent/Cargo.lock index 323cf66959f..31debdfed7e 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -977,6 +977,7 @@ dependencies = [ "envmnt", "flate2", "flexi_logger", + "fs2", "futures", "grpc", "hex", @@ -1375,6 +1376,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fsio" version = "0.4.0" @@ -3165,6 +3176,7 @@ dependencies = [ "pcap 0.10.1", "pnet", "prost", + "rand", "regex", "serde", "serde_json", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 390be022535..19051c08a28 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -48,6 +48,7 @@ enum_dispatch = "0.3.7" envmnt = "0.10.4" flate2 = "1.0" flexi_logger = "0.29" +fs2 = "0.4" futures = "~0.3" grpc = { path = "plugins/grpc" } hex = "0.4.3" diff --git a/agent/crates/public/Cargo.toml b/agent/crates/public/Cargo.toml index 0b5ff8e63ba..72ce3457d19 100644 --- a/agent/crates/public/Cargo.toml +++ b/agent/crates/public/Cargo.toml @@ -20,6 +20,7 @@ num_enum = "0.5.6" parking_lot = "0.11" pnet = "^0.29" prost.workspace = true +rand = "0.8.5" regex.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.72" diff --git a/agent/crates/public/src/lib.rs b/agent/crates/public/src/lib.rs index 7bd14300737..4f58bbdef83 100644 --- a/agent/crates/public/src/lib.rs +++ b/agent/crates/public/src/lib.rs @@ -31,9 +31,11 @@ pub mod pwd; pub mod queue; pub mod rpc; pub mod sender; +pub mod throttle; pub mod utils; #[cfg(target_os = "linux")] pub mod netns; pub use leaky_bucket::LeakyBucket; +pub use throttle::Throttle; diff --git a/agent/crates/public/src/queue/overwrite_queue.rs b/agent/crates/public/src/queue/overwrite_queue.rs index a64180c938a..1f05c365022 100644 --- a/agent/crates/public/src/queue/overwrite_queue.rs +++ b/agent/crates/public/src/queue/overwrite_queue.rs @@ -55,6 +55,8 @@ struct OverwriteQueue { counter: Counter, + total_overwritten_count: AtomicU64, + _marker: PhantomData, } @@ -78,6 +80,7 @@ impl OverwriteQueue { notify: Condvar::new(), terminated: AtomicBool::new(false), counter: Counter::default(), + total_overwritten_count: AtomicU64::new(0), _marker: PhantomData, } } @@ -130,6 +133,8 @@ impl OverwriteQueue { self.counter .overwritten .fetch_add(to_overwrite as u64, Ordering::Relaxed); + self.total_overwritten_count + .fetch_add(to_overwrite as u64, Ordering::Relaxed); } } let free_after_end = self.size - (raw_end & (self.size - 1)); @@ -416,6 +421,13 @@ impl Receiver { } } } + + pub fn total_overwritten_count(&self) -> u64 { + self.counter() + .queue + .total_overwritten_count + .load(Ordering::Relaxed) + } } impl Drop for Receiver { diff --git a/agent/crates/public/src/throttle.rs b/agent/crates/public/src/throttle.rs new file mode 100644 index 00000000000..76c6ccfa2dd --- /dev/null +++ b/agent/crates/public/src/throttle.rs @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2025 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::fmt::Debug; + +use log::warn; +use rand::prelude::{Rng, SeedableRng, SmallRng}; + +use crate::queue::DebugSender; +use crate::LeakyBucket; + +const BUFFER_SIZE: usize = 1024; +pub struct Throttle { + leaky_bucket: LeakyBucket, + period_count: u32, + buffer: Vec, + output_queue: DebugSender, + small_rng: SmallRng, +} + +impl Throttle { + pub fn new(rate: u64, output_queue: DebugSender) -> Self { + Throttle { + leaky_bucket: LeakyBucket::new(Some(rate)), + buffer: Vec::with_capacity(BUFFER_SIZE), + output_queue, + small_rng: SmallRng::from_entropy(), + period_count: 0, + } + } + + // return false, indicates that the throttle has been reached + // and the item or cached items will be discarded + pub fn send(&mut self, item: T) -> bool { + if self.buffer.len() > BUFFER_SIZE { + self.flush(); + self.period_count = 0; + } + + self.period_count += 1; + if self.leaky_bucket.acquire(1) { + self.buffer.push(item); + } else { + let index = self.small_rng.gen_range(0..self.period_count) as usize; + if index < self.buffer.len() { + self.buffer[index] = item; + } + return false; + } + true + } + + pub fn flush(&mut self) { + if !self.buffer.is_empty() { + if let Err(e) = self.output_queue.send_all(&mut self.buffer) { + warn!( + "throttle push {} items to queue failed, because {:?}", + self.buffer.len(), + e + ); + self.buffer.clear(); + } + } + } + + pub fn set_rate(&mut self, rate: u64) { + self.leaky_bucket.set_rate(Some(rate)); + } +} diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 315de30d910..9c78b3dbfe1 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -2079,12 +2079,35 @@ impl Default for TxThroughput { } } -#[derive(Clone, Copy, Default, Debug, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct FreeDisk { + pub percentage_trigger_threshold: u8, + #[serde(deserialize_with = "deser_u64_with_giga_unit")] + pub absolute_trigger_threshold: u64, + pub directories: Vec, +} + +impl Default for FreeDisk { + fn default() -> Self { + Self { + percentage_trigger_threshold: 15, + absolute_trigger_threshold: 10 << 30, + #[cfg(not(target_os = "windows"))] + directories: vec!["/".to_string()], + #[cfg(target_os = "windows")] + directories: vec!["c:\\".to_string()], + } + } +} + +#[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct CircuitBreakers { pub sys_memory_percentage: SysMemoryPercentage, pub relative_sys_load: RelativeSysLoad, pub tx_throughput: TxThroughput, + pub free_disk: FreeDisk, } #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] @@ -2132,6 +2155,33 @@ impl Default for Ntp { } } +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[repr(u8)] +pub enum TrafficOverflowAction { + #[default] + Waiting = 0, + Dropping = 1, +} + +fn to_traffic_overflow_action<'de: 'a, 'a, D>( + deserializer: D, +) -> Result +where + D: Deserializer<'de>, +{ + match <&'a str>::deserialize(deserializer)? + .to_uppercase() + .as_str() + { + "WAIT" => Ok(TrafficOverflowAction::Waiting), + "DROP" => Ok(TrafficOverflowAction::Dropping), + other => Err(de::Error::invalid_value( + Unexpected::Str(other), + &"WAIT|DROP", + )), + } +} + #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct Communication { @@ -2144,6 +2194,8 @@ pub struct Communication { #[serde(deserialize_with = "deser_usize_with_mega_unit")] pub grpc_buffer_size: usize, pub max_throughput_to_ingester: u64, + #[serde(deserialize_with = "to_traffic_overflow_action")] + pub ingester_traffic_overflow_action: TrafficOverflowAction, pub request_via_nat_ip: bool, pub proxy_controller_ip: String, pub proxy_controller_port: u16, @@ -2160,6 +2212,7 @@ impl Default for Communication { ingester_port: 30033, grpc_buffer_size: 5 << 20, max_throughput_to_ingester: 100, + ingester_traffic_overflow_action: TrafficOverflowAction::Waiting, request_via_nat_ip: false, } } @@ -3299,6 +3352,13 @@ where Ok(usize::deserialize(deserializer)? << 20) } +fn deser_u64_with_giga_unit<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + Ok(u64::deserialize(deserializer)? << 30) +} + // `humantime` will not parse "0" as Duration::ZERO // If "0" is a valid option for a duration field, use this deserializer // #[serde(deserialize_with = "deser_humantime_with_zero")] diff --git a/agent/src/config/handler.rs b/agent/src/config/handler.rs index 4a8028919b4..3b326f8e300 100755 --- a/agent/src/config/handler.rs +++ b/agent/src/config/handler.rs @@ -53,7 +53,7 @@ use super::{ HttpEndpointMatchRule, OracleConfig, PcapStream, PortConfig, ProcessorsFlowLogTunning, RequestLogTunning, SessionTimeout, TagFilterOperator, UserConfig, }, - ConfigError, KubernetesPollerType, + ConfigError, KubernetesPollerType, TrafficOverflowAction, }; use crate::flow_generator::protocol_logs::decode_new_rpc_trace_context_with_type; use crate::rpc::Session; @@ -218,6 +218,9 @@ pub struct EnvironmentConfig { pub system_load_circuit_breaker_recover: f32, pub system_load_circuit_breaker_metric: agent::SystemLoadMetric, pub page_cache_reclaim_percentage: u8, + pub free_disk_circuit_breaker_percentage_threshold: u8, + pub free_disk_circuit_breaker_absolute_threshold: u64, + pub free_disk_circuit_breaker_directories: Vec, } #[derive(Clone, PartialEq, Eq, Debug)] @@ -237,6 +240,7 @@ pub struct SenderConfig { pub npb_socket_type: agent::SocketType, pub multiple_sockets_to_ingester: bool, pub max_throughput_to_ingester: u64, // unit: Mbps + pub ingester_traffic_overflow_action: TrafficOverflowAction, pub collector_socket_type: agent::SocketType, pub standalone_data_file_size: u64, pub standalone_data_file_dir: String, @@ -1664,6 +1668,22 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig { .relative_sys_load .metric, page_cache_reclaim_percentage: conf.global.tunning.page_cache_reclaim_percentage, + free_disk_circuit_breaker_percentage_threshold: conf + .global + .circuit_breakers + .free_disk + .percentage_trigger_threshold, + free_disk_circuit_breaker_absolute_threshold: conf + .global + .circuit_breakers + .free_disk + .absolute_trigger_threshold, + free_disk_circuit_breaker_directories: { + let mut v = conf.global.circuit_breakers.free_disk.directories.clone(); + v.sort(); + v.dedup(); + v + }, }, synchronizer: SynchronizerConfig { sync_interval: conf.global.communication.proactive_request_interval, @@ -1756,6 +1776,10 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig { .throughput_monitoring_interval, multiple_sockets_to_ingester: conf.outputs.socket.multiple_sockets_to_ingester, max_throughput_to_ingester: conf.global.communication.max_throughput_to_ingester, + ingester_traffic_overflow_action: conf + .global + .communication + .ingester_traffic_overflow_action, collector_socket_type: conf.outputs.socket.data_socket_type, standalone_data_file_size: conf.global.standalone_mode.max_data_file_size, standalone_data_file_dir: conf.global.standalone_mode.data_file_dir.clone(), @@ -3902,6 +3926,17 @@ impl ConfigHandler { ); communication.max_throughput_to_ingester = new_communication.max_throughput_to_ingester; } + if communication.ingester_traffic_overflow_action + != new_communication.ingester_traffic_overflow_action + { + info!( + "Update global.communication.ingester_traffic_overflow_action from {:?} to {:?}.", + communication.ingester_traffic_overflow_action, + new_communication.ingester_traffic_overflow_action + ); + communication.ingester_traffic_overflow_action = + new_communication.ingester_traffic_overflow_action; + } if communication.ingester_ip != new_communication.ingester_ip { info!( "Update global.communication.ingester_ip from {:?} to {:?}.", @@ -4003,6 +4038,34 @@ impl ConfigHandler { limits.max_sockets_tolerate_interval = new_limits.max_sockets_tolerate_interval; } + let free_disk = &mut config.global.circuit_breakers.free_disk; + let new_free_disk = &mut new_config.user_config.global.circuit_breakers.free_disk; + if free_disk.percentage_trigger_threshold != new_free_disk.percentage_trigger_threshold { + info!( + "Update global.circuit_breakers.free_disk.percentage_trigger_threshold from {:?} to {:?}.", + free_disk.percentage_trigger_threshold, new_free_disk.percentage_trigger_threshold + ); + free_disk.percentage_trigger_threshold = new_free_disk.percentage_trigger_threshold; + } + + if free_disk.absolute_trigger_threshold != new_free_disk.absolute_trigger_threshold { + info!( + "Update global.circuit_breakers.free_disk.absolute_trigger_threshold from {:?} to {:?}.", + free_disk.absolute_trigger_threshold, new_free_disk.absolute_trigger_threshold + ); + free_disk.absolute_trigger_threshold = new_free_disk.absolute_trigger_threshold; + } + + new_free_disk.directories.sort(); + new_free_disk.directories.dedup(); + if free_disk.directories != new_free_disk.directories { + info!( + "Update global.circuit_breakers.free_disk.directories from {:?} to {:?}.", + free_disk.directories, new_free_disk.directories + ); + free_disk.directories = new_free_disk.directories.clone(); + } + let ntp = &mut config.global.ntp; let new_ntp = &mut new_config.user_config.global.ntp; if ntp.enabled != new_ntp.enabled { diff --git a/agent/src/config/mod.rs b/agent/src/config/mod.rs index 113ab876a7b..6e09d3b5ca3 100644 --- a/agent/src/config/mod.rs +++ b/agent/src/config/mod.rs @@ -19,7 +19,7 @@ pub mod handler; pub use config::{ AgentIdType, Config, ConfigError, DpdkSource, KubernetesPollerType, OracleConfig, PcapStream, - PrometheusExtraLabels, UserConfig, K8S_CA_CRT_PATH, + PrometheusExtraLabels, TrafficOverflowAction, UserConfig, K8S_CA_CRT_PATH, }; #[cfg(any(target_os = "linux", target_os = "android"))] pub use config::{ApiResources, ProcessMatcher}; diff --git a/agent/src/flow_generator/protocol_logs.rs b/agent/src/flow_generator/protocol_logs.rs index b0edd7e2445..e96334b0119 100644 --- a/agent/src/flow_generator/protocol_logs.rs +++ b/agent/src/flow_generator/protocol_logs.rs @@ -388,6 +388,15 @@ pub struct BoxAppProtoLogsData { pub override_resp_status: Option, } +impl BoxAppProtoLogsData { + pub fn new(data: Box, override_resp_status: Option) -> Self { + Self { + data, + override_resp_status, + } + } +} + impl Sendable for BoxAppProtoLogsData { fn encode(self, buf: &mut Vec) -> Result { let mut pb_proto_logs_data = flow_log::AppProtoLogsData { diff --git a/agent/src/flow_generator/protocol_logs/parser.rs b/agent/src/flow_generator/protocol_logs/parser.rs index a6032fa0401..72517766f96 100644 --- a/agent/src/flow_generator/protocol_logs/parser.rs +++ b/agent/src/flow_generator/protocol_logs/parser.rs @@ -26,8 +26,7 @@ use std::{ }; use arc_swap::access::Access; -use log::{debug, info, warn}; -use rand::prelude::{Rng, SeedableRng, SmallRng}; +use log::{debug, info}; use serde::Serialize; use super::{AppProtoHead, AppProtoLogsBaseInfo, BoxAppProtoLogsData, LogMessageType}; @@ -55,15 +54,13 @@ use public::utils::string::get_string_from_chars; use public::{ chrono_map::ChronoMap, queue::{self, DebugSender, Receiver}, + throttle::Throttle, utils::net::MacAddr, }; const QUEUE_BATCH_SIZE: usize = 1024; const RCV_TIMEOUT: Duration = Duration::from_secs(1); -const THROTTLE_BUCKET_BITS: u8 = 2; -const THROTTLE_BUCKET: usize = 1 << THROTTLE_BUCKET_BITS; // 2^N。由于发送方是有突发的,需要累积一定时间做采样 - #[derive(Debug)] pub enum AppProto { SocketClosed(u128), @@ -342,108 +339,21 @@ impl RefCountable for SessionAggrCounter { } } -struct Throttle { - interval: Duration, - last_flush_time: Duration, - throttle: u32, - throttle_multiple: u32, - period_count: u32, - config: LogParserAccess, - small_rng: SmallRng, -} - -impl Throttle { - fn new(config: LogParserAccess, interval: u64) -> Self { - Self { - config, - interval: Duration::from_secs(interval), - throttle: 0, - throttle_multiple: interval as u32, - period_count: 0, - last_flush_time: Duration::ZERO, - small_rng: SmallRng::from_entropy(), - } - } - - fn tick(&mut self, current: Duration) { - self.last_flush_time = current; - self.period_count = 0; - self.throttle = - (self.config.load().l7_log_collect_nps_threshold as u32) * self.throttle_multiple; - } - - fn acquire(&mut self, current: Duration) -> bool { - self.period_count += 1; - - // Local timestamp may be modified - if current < self.last_flush_time { - self.last_flush_time = current; - } - - if current > self.last_flush_time + self.interval || self.last_flush_time.is_zero() { - self.tick(current); - } - - if self.period_count >= self.throttle { - return self.small_rng.gen_range(0..self.period_count) < self.throttle; - } - - true - } -} - -struct BufferSender { - batch: Vec, - - output_queue: DebugSender, - throttle: Throttle, - +struct ThrottleSender { + throttle: Throttle, counter: Arc, } -impl BufferSender { - fn new( - config: LogParserAccess, - output_queue: DebugSender, - counter: Arc, - ) -> Self { - Self { - batch: Vec::with_capacity(QUEUE_BATCH_SIZE), - output_queue, - throttle: Throttle::new(config, 1), - counter, - } - } - - fn send(&mut self, item: Box, override_resp_status: Option) { - if item.l7_info.skip_send() || item.l7_info.is_on_blacklist() { +impl ThrottleSender { + fn send(&mut self, data: Box, override_resp_status: Option) { + if data.l7_info.skip_send() || data.l7_info.is_on_blacklist() { return; } - - if !self.throttle.acquire(item.base_info.start_time.into()) { + if !self + .throttle + .send(BoxAppProtoLogsData::new(data, override_resp_status)) + { self.counter.throttle_drop.fetch_add(1, Ordering::Relaxed); - return; - } - - if self.batch.len() >= QUEUE_BATCH_SIZE { - if let Err(e) = self.output_queue.send_all(&mut self.batch) { - warn!("output queue failed to send data, because: {:?}", e); - self.batch.clear(); - } - } - - self.batch.push(BoxAppProtoLogsData { - data: item, - override_resp_status, - }); - } - - fn flush(&mut self) { - if !self.batch.is_empty() { - if let Err(e) = self.output_queue.send_all(&mut self.batch) { - warn!("output queue failed to send data, because: {:?}", e); - self.batch.clear(); - } } } } @@ -458,7 +368,8 @@ struct SessionQueue { counter: Arc, - bs: BufferSender, + throttle_sender: ThrottleSender, + l7_log_collect_nps_threshold: u64, } impl SessionQueue { @@ -480,7 +391,11 @@ impl SessionQueue { counter: counter.clone(), - bs: BufferSender::new(config, output_queue, counter), + throttle_sender: ThrottleSender { + throttle: Throttle::new(conf.l7_log_collect_nps_threshold, output_queue), + counter: counter.clone(), + }, + l7_log_collect_nps_threshold: conf.l7_log_collect_nps_threshold, } } @@ -507,7 +422,7 @@ impl SessionQueue { p.l7_info.get_request_resource_length() as u64, Ordering::Relaxed, ); - self.bs.send(p, None); + self.throttle_sender.send(p, None); } self.counter.receive.fetch_add(1, Ordering::Relaxed); return; @@ -530,7 +445,7 @@ impl SessionQueue { if item.base_info.end_time.is_zero() { item.base_info.end_time = item.base_info.start_time; } - self.bs.send(item, None); + self.throttle_sender.send(item, None); return; } @@ -546,7 +461,7 @@ impl SessionQueue { timeout_time, self.window_start ); - self.bs.send(item, None); + self.throttle_sender.send(item, None); return; } @@ -560,7 +475,8 @@ impl SessionQueue { v.l7_info.get_request_resource_length() as u64, Ordering::Relaxed, ); - self.bs.send(self.entries.remove(&key).unwrap(), None); + self.throttle_sender + .send(self.entries.remove(&key).unwrap(), None); } } else { match item.base_info.head.msg_type { @@ -569,7 +485,7 @@ impl SessionQueue { if v.is_request() && item.base_info.start_time > v.base_info.start_time => { if let Err(_) = v.session_merge(&mut item) { - self.bs.send(item, None); + self.throttle_sender.send(item, None); } self.counter.cached.fetch_sub(1, Ordering::Relaxed); self.counter.cached_request_resource.fetch_sub( @@ -577,7 +493,8 @@ impl SessionQueue { Ordering::Relaxed, ); self.counter.merge.fetch_add(1, Ordering::Relaxed); - self.bs.send(self.entries.remove(&key).unwrap(), None); + self.throttle_sender + .send(self.entries.remove(&key).unwrap(), None); } // If the order is out of order and there is a response, it can be matched as a session, and the aggregated response is sent LogMessageType::Request @@ -591,23 +508,24 @@ impl SessionQueue { ); let mut v = self.entries.remove(&key).unwrap(); if let Err(_) = item.session_merge(&mut v) { - self.bs.send(v, None); + self.throttle_sender.send(v, None); } self.counter.cached.fetch_sub(1, Ordering::Relaxed); self.counter.merge.fetch_add(1, Ordering::Relaxed); - self.bs.send(item, None); + self.throttle_sender.send(item, None); } // if entry and item cannot merge, send the early one and cache the other _ => { if v.base_info.start_time > item.base_info.start_time { - self.bs.send(item, None); + self.throttle_sender.send(item, None); } else { // swap out old item and send self.counter.cached_request_resource.fetch_sub( v.l7_info.get_request_resource_length() as u64, Ordering::Relaxed, ); - self.bs.send(self.entries.remove(&key).unwrap(), None); + self.throttle_sender + .send(self.entries.remove(&key).unwrap(), None); self.counter.cached_request_resource.fetch_add( item.l7_info.get_request_resource_length() as u64, Ordering::Relaxed, @@ -636,7 +554,7 @@ impl SessionQueue { v.l7_info.get_request_resource_length() as u64, Ordering::Relaxed, ); - self.bs.send(v, None); + self.throttle_sender.send(v, None); } return; } @@ -651,9 +569,9 @@ impl SessionQueue { fn flush(&mut self) { for item in self.entries.drain(..) { - self.bs.send(item, None); + self.throttle_sender.send(item, None); } - self.bs.flush(); + self.throttle_sender.throttle.flush(); self.counter.cached.store(0, Ordering::Relaxed); self.counter .cached_request_resource @@ -669,7 +587,8 @@ impl SessionQueue { item.l7_info.get_request_resource_length() as u64, Ordering::Relaxed, ); - self.bs.send(item.clone(), Some(L7ResponseStatus::Timeout)); + self.throttle_sender + .send(item.clone(), Some(L7ResponseStatus::Timeout)); None }); // update timestamp @@ -780,6 +699,21 @@ impl SessionAggregator { session_queue.max_entries = config.l7_log_session_aggr_max_entries; session_queue.flush(); } + if config.l7_log_collect_nps_threshold + != session_queue.l7_log_collect_nps_threshold + { + info!( + "update l7_log_collect_nps_threshold from {} to {}", + session_queue.l7_log_collect_nps_threshold, + config.l7_log_collect_nps_threshold + ); + session_queue.l7_log_collect_nps_threshold = + config.l7_log_collect_nps_threshold; + session_queue + .throttle_sender + .throttle + .set_rate(config.l7_log_collect_nps_threshold); + } } session_queue.flush(); }) diff --git a/agent/src/monitor.rs b/agent/src/monitor.rs index 00aa72cf90a..22b8b6476b8 100644 --- a/agent/src/monitor.rs +++ b/agent/src/monitor.rs @@ -31,10 +31,12 @@ use sysinfo::{get_current_pid, Pid, ProcessExt, ProcessRefreshKind, System, Syst #[cfg(target_os = "linux")] use crate::utils::{cgroups, environment::SocketInfo}; + use crate::{ config::handler::EnvironmentAccess, error::{Error, Result}, utils::{ + environment::get_disk_usage, process::{get_current_sys_memory_percentage, get_file_and_size_sum}, stats::{ self, Collector, Countable, Counter, CounterType, CounterValue, RefCountable, @@ -227,7 +229,8 @@ impl RefCountable for SysStatusBroker { CounterValue::Unsigned(current_sys_available_memory_percentage as u64), )); - let sys_memory_limit = self.config.load().sys_memory_limit as f64; + let config = self.config.load(); + let sys_memory_limit = config.sys_memory_limit as f64; let (sys_free_memory_limit_ratio, sys_available_memory_limit_ratio) = if sys_memory_limit > 0.0 { @@ -266,6 +269,7 @@ impl RefCountable for SysStatusBroker { warn!("get file and size sum failed: {:?}", e); } } + match system_guard.process(self.pid) { Some(process) => { let cpu_usage = process.cpu_usage() as f64; @@ -279,7 +283,7 @@ impl RefCountable for SysStatusBroker { metrics.push(( "max_millicpus_ratio", CounterType::Gauged, - CounterValue::Float(cpu_usage * 10.0 / self.config.load().max_millicpus as f64), + CounterValue::Float(cpu_usage * 10.0 / config.max_millicpus as f64), )); metrics.push(( "memory", @@ -289,7 +293,7 @@ impl RefCountable for SysStatusBroker { metrics.push(( "max_memory_ratio", CounterType::Gauged, - CounterValue::Float(mem_used as f64 / self.config.load().max_memory as f64), + CounterValue::Float(mem_used as f64 / config.max_memory as f64), )); metrics.push(( "create_time", @@ -373,6 +377,44 @@ impl stats::Module for NetStats<'_> { } } +struct FreeDiskUsage { + directory: String, +} + +impl stats::Module for FreeDiskUsage { + fn name(&self) -> &'static str { + "free_disk" + } + + fn tags(&self) -> Vec { + vec![StatsOption::Tag("directory", self.directory.clone())] + } +} + +impl RefCountable for FreeDiskUsage { + fn get_counters(&self) -> Vec { + let mut metrics = vec![]; + match get_disk_usage(&self.directory) { + Ok((total, free)) => { + metrics.push(( + "free_disk_percentage", + CounterType::Gauged, + CounterValue::Float(free as f64 * 100.0 / total as f64), + )); + metrics.push(( + "free_disk_absolute", + CounterType::Gauged, + CounterValue::Unsigned(free as u64), + )); + } + Err(e) => { + warn!("get disk free usage failed: {:?}", e); + } + } + metrics + } +} + pub struct Monitor { stats: Arc, running: AtomicBool, @@ -380,6 +422,9 @@ pub struct Monitor { sys_load: Arc, link_map: Arc>>>, system: Arc>, + config: EnvironmentAccess, + free_disks_config: Arc>>, + free_disk_counters: Arc>>>, } impl Monitor { @@ -399,6 +444,9 @@ impl Monitor { sys_load: Arc::new(SysLoad(system.clone())), link_map: Arc::new(Mutex::new(HashMap::new())), system, + config: config.clone(), + free_disks_config: Arc::new(Mutex::new(vec![])), + free_disk_counters: Arc::new(Mutex::new(vec![])), }) } @@ -519,6 +567,42 @@ impl Monitor { Countable::Ref(Arc::downgrade(&self.sys_load) as Weak), ); + let config = self.config.clone(); + let stats_collector = self.stats.clone(); + let free_disks_config = self.free_disks_config.clone(); + let free_disk_counters = self.free_disk_counters.clone(); + self.stats.register_pre_hook(Box::new(move || { + let config_load = config.load(); + let mut free_disks_config = free_disks_config.lock().unwrap(); + if config_load.free_disk_circuit_breaker_directories == *free_disks_config { + return; + } + + let mut locked_counters = free_disk_counters.lock().unwrap(); + let old_data = std::mem::take(&mut *locked_counters); + stats_collector + .deregister_countables(old_data.iter().map(|c| c.as_ref() as &dyn stats::Module)); + + for free_disk in &config_load.free_disk_circuit_breaker_directories { + let free_disk_counter = Arc::new(FreeDiskUsage { + directory: free_disk.clone(), + }); + stats_collector.register_countable( + &FreeDiskUsage { + directory: free_disk.clone(), + }, + Countable::Ref(Arc::downgrade(&free_disk_counter) as Weak), + ); + locked_counters.push(free_disk_counter); + } + + info!( + "update free disk monitor from {:?} to {:?}", + free_disks_config, config_load.free_disk_circuit_breaker_directories + ); + *free_disks_config = config_load.free_disk_circuit_breaker_directories.clone(); + })); + info!("monitor started"); } diff --git a/agent/src/rpc/synchronizer.rs b/agent/src/rpc/synchronizer.rs index 4d8496e7fcd..0154c5042d0 100644 --- a/agent/src/rpc/synchronizer.rs +++ b/agent/src/rpc/synchronizer.rs @@ -1572,7 +1572,6 @@ impl Synchronizer { } else { match Self::upgrade(&running, &session, &revision, &id, &agent_state).await { Ok(true) => { - agent_state.terminate(); warn!("agent upgrade is successful and restarts normally, deepflow-agent restart..."); crate::utils::clean_and_exit(NORMAL_EXIT_WITH_RESTART); return; diff --git a/agent/src/sender/uniform_sender.rs b/agent/src/sender/uniform_sender.rs index 6a59583d0a6..7c63ce79094 100644 --- a/agent/src/sender/uniform_sender.rs +++ b/agent/src/sender/uniform_sender.rs @@ -30,12 +30,18 @@ use std::time::{Duration, Instant, SystemTime}; use arc_swap::access::Access; use lazy_static::lazy_static; use log::{debug, error, info, warn}; -use public::sender::{SendMessageType, Sendable}; +use public::{ + leaky_bucket::LeakyBucket, + sender::{SendMessageType, Sendable}, +}; use rand::{thread_rng, RngCore}; use super::{get_sender_id, QUEUE_BATCH_SIZE}; -use crate::config::handler::{SenderAccess, SenderConfig}; +use crate::config::{ + handler::{SenderAccess, SenderConfig}, + TrafficOverflowAction, +}; use crate::exception::ExceptionHandler; use crate::trident::SenderEncoder; use crate::utils::stats::{ @@ -45,6 +51,7 @@ use public::proto::agent::{Exception, SocketType}; use public::queue::{Error, Receiver}; const PRE_FILE_SUFFIX: &str = ".pre"; +const MAX_WAIT_TIMES: u32 = 100; #[derive(Debug, Default)] pub struct SenderCounter { @@ -52,6 +59,7 @@ pub struct SenderCounter { pub tx: AtomicU64, pub tx_bytes: AtomicU64, pub dropped: AtomicU64, + pub waited: AtomicU64, } impl RefCountable for SenderCounter { @@ -77,6 +85,11 @@ impl RefCountable for SenderCounter { CounterType::Counted, CounterValue::Unsigned(self.dropped.swap(0, Ordering::Relaxed)), ), + ( + "waited", + CounterType::Counted, + CounterValue::Unsigned(self.waited.swap(0, Ordering::Relaxed)), + ), ] } } @@ -141,10 +154,10 @@ impl Encoder { version: HEADER_VESION, team_id: 0, organization_id: 0, - agent_id: agent_id, + agent_id, reserved_1: 0, reserved_2: 0, - encoder: encoder, + encoder, }, _marker: PhantomData, } @@ -179,8 +192,7 @@ impl Encoder { self.buffer[0..4].copy_from_slice(frame_size.to_be_bytes().as_slice()); } - pub fn update_header(&mut self, name: &str, id: usize, config: &SenderAccess) { - let config = config.load(); + pub fn update_header(&mut self, name: &str, id: usize, config: &SenderConfig) { if self.header.agent_id != config.agent_id || self.header.team_id != config.team_id || self.header.organization_id != config.organize_id as u16 @@ -241,6 +253,7 @@ pub struct UniformSenderThread { private_shared_conn: Option>>, sender_encoder: SenderEncoder, + leaky_bucket: Arc, } impl UniformSenderThread { @@ -252,6 +265,7 @@ impl UniformSenderThread { exception_handler: ExceptionHandler, private_shared_conn: Option>>, sender_encoder: SenderEncoder, + leaky_bucket: Arc, ) -> Self { let running = Arc::new(AtomicBool::new(false)); Self { @@ -265,6 +279,7 @@ impl UniformSenderThread { exception_handler, private_shared_conn, sender_encoder, + leaky_bucket, } } @@ -287,6 +302,7 @@ impl UniformSenderThread { self.exception_handler.clone(), self.private_shared_conn.clone(), self.sender_encoder, + self.leaky_bucket.clone(), ); self.thread_handle = Some( thread::Builder::new() @@ -325,9 +341,6 @@ impl UniformSenderThread { lazy_static! { static ref GLOBAL_CONNECTION: Arc> = Arc::new(Mutex::new(Connection::new())); - static ref TOTAL_SENT_BYTES: Arc = Arc::new(AtomicU64::new(0)); - static ref SENT_START_DURATION: Arc = Arc::new(AtomicU64::new(0)); - static ref LAST_LOGGING_DURATION: Arc = Arc::new(AtomicU64::new(0)); } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -368,6 +381,7 @@ pub struct UniformSender { input: Arc>, counter: Arc, + overwritten_count: u64, encoder: Encoder, private_conn: Mutex, @@ -377,6 +391,9 @@ pub struct UniformSender { multiple_sockets_to_ingester: bool, dest_ip: String, dest_port: u16, + max_throughput_mbps: u64, + leaky_bucket: Arc, + last_traffic_overflow: Duration, config: SenderAccess, @@ -407,6 +424,7 @@ impl UniformSender { exception_handler: ExceptionHandler, private_shared_conn: Option>>, sender_encoder: SenderEncoder, + leaky_bucket: Arc, ) -> Self { let cfg = config.load(); Self { @@ -414,6 +432,7 @@ impl UniformSender { name, input, counter: Arc::new(SenderCounter::default()), + overwritten_count: 0, encoder: Encoder::new( 0, SendMessageType::TaggedFlow, @@ -428,6 +447,9 @@ impl UniformSender { multiple_sockets_to_ingester: false, dest_ip: "127.0.0.1".to_string(), dest_port: cfg.dest_port, + max_throughput_mbps: 0, + leaky_bucket, + last_traffic_overflow: Duration::ZERO, running, stats, @@ -441,9 +463,7 @@ impl UniformSender { } } - fn update_connection(&mut self) { - let cfg = self.config.load(); - + fn update_connection(&mut self, cfg: &SenderConfig) { if self.multiple_sockets_to_ingester != cfg.multiple_sockets_to_ingester || self.dest_ip != cfg.dest_ip || self.dest_port != cfg.dest_port @@ -496,17 +516,20 @@ impl UniformSender { } } - fn flush_encoder(&mut self) { + fn flush_encoder(&mut self, config: &SenderConfig) { self.cached = true; if self.encoder.buffer_len() > 0 { self.encoder.compress_buffer(); self.encoder.set_header_frame_size(); - self.send_buffer(); + self.send_buffer(config); self.encoder.reset_buffer(); } } - fn send_buffer(&mut self) { + fn send_buffer(&mut self, config: &SenderConfig) { + if self.is_traffic_overflow(config) { + return; + } let mut conn = match self.connection_type { ConnectionType::Global => self.global_shared_conn.lock().unwrap(), ConnectionType::PrivateShared => { @@ -584,7 +607,6 @@ impl UniformSender { self.counter .tx_bytes .fetch_add(buffer.len() as u64, Ordering::Relaxed); - TOTAL_SENT_BYTES.fetch_add(buffer.len() as u64, Ordering::Relaxed); break; } } @@ -608,37 +630,55 @@ impl UniformSender { } } - fn is_exceed_max_throughput(&mut self, max_throughput_mbps: u64) -> bool { - if max_throughput_mbps == 0 { - return false; + fn log_when_traffic_overflow(&mut self, config: &SenderConfig) { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + // to prevent frequent log printing, print at least once every 10 seconds + if now - self.last_traffic_overflow > Duration::from_secs(10) { + warn!( + "{} sender dropping message, throughput exceed setting value 'max_throughput_to_ingester' {}Mbps, action {:?}, total overwrittern count {}", + self.name, self.max_throughput_mbps, config.ingester_traffic_overflow_action, self.overwritten_count + ); + self.last_traffic_overflow = now; } - let max_throughput_bytes = max_throughput_mbps << 20 >> 3; - if TOTAL_SENT_BYTES.load(Ordering::Relaxed) > max_throughput_bytes { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); + } - let used = now - Duration::from_nanos(SENT_START_DURATION.load(Ordering::Relaxed)); - if used > Duration::from_secs(1) { - SENT_START_DURATION.store(now.as_nanos() as u64, Ordering::Relaxed); - TOTAL_SENT_BYTES.store(0, Ordering::Relaxed); - } else { - // to prevent frequent log printing, print at least once every 5 seconds - if now - Duration::from_nanos(LAST_LOGGING_DURATION.load(Ordering::Relaxed)) - > Duration::from_secs(5) - { - warn!( - "{} sender dropping message, throughput execeed setting value 'max_throughput_to_ingester' {}Mbps", - self.name, max_throughput_mbps - ); - LAST_LOGGING_DURATION.store(now.as_nanos() as u64, Ordering::Relaxed); - } - self.exception_handler - .set(Exception::DataBpsThresholdExceeded); - return true; + fn is_traffic_overflow(&mut self, config: &SenderConfig) -> bool { + if self.max_throughput_mbps == 0 { + return false; + } + let mut overflow = false; + if config.ingester_traffic_overflow_action == TrafficOverflowAction::Waiting { + // When stopped, at least one acquire() is successfully triggered every 100ms, and the + // loop can be exited quickly without getting stuck here. + let mut wait_times = 0; + while !self.leaky_bucket.acquire(self.encoder.buffer_len() as u64) + && wait_times < MAX_WAIT_TIMES + { + wait_times += 1; + // LeakyBucket token is updated every 100ms by default, + // wait 20ms each time until the token is acquired + thread::sleep(Duration::from_millis(20)); + self.counter.waited.fetch_add(1, Ordering::Relaxed); } + if wait_times == MAX_WAIT_TIMES { + overflow = true; + } + } else { + if !self.leaky_bucket.acquire(self.encoder.buffer_len() as u64) { + overflow = true; + self.counter.dropped.fetch_add(1, Ordering::Relaxed); + } + } + + if overflow || self.input.total_overwritten_count() > self.overwritten_count { + self.overwritten_count = self.input.total_overwritten_count(); + self.exception_handler + .set(Exception::DataBpsThresholdExceeded); + self.log_when_traffic_overflow(config); } - return false; + overflow } fn check_or_register_counterable(&mut self, message_type: SendMessageType) { @@ -659,7 +699,11 @@ impl UniformSender { while self.running.load(Ordering::Relaxed) { let config = self.config.load(); let socket_type = config.collector_socket_type; - let max_throughput_mpbs = config.max_throughput_to_ingester; + let max_throughput_mbps = config.max_throughput_to_ingester; + if self.max_throughput_mbps != max_throughput_mbps { + self.leaky_bucket.set_rate(Some(max_throughput_mbps << 17)); // Mbit -> byte + self.max_throughput_mbps = max_throughput_mbps; + } match self.input.recv_all( &mut batch, Some(Duration::from_secs(Self::QUEUE_READ_TIMEOUT)), @@ -670,13 +714,6 @@ impl UniformSender { start_cached = Instant::now(); self.cached = false; } - if self.is_exceed_max_throughput(max_throughput_mpbs) { - self.counter - .dropped - .fetch_add(batch.len() as u64, Ordering::Relaxed); - batch.clear(); - continue; - } for send_item in batch.drain(..) { if !self.running.load(Ordering::Relaxed) { break; @@ -692,7 +729,7 @@ impl UniformSender { SocketType::File => { self.handle_target_file(send_item, &mut kv_string, &config) } - _ => self.handle_target_server(send_item), + _ => self.handle_target_server(send_item, &config), }; if let Err(e) = result { if self.counter.dropped.load(Ordering::Relaxed) == 0 { @@ -710,15 +747,15 @@ impl UniformSender { Err(Error::Timeout) => match socket_type { SocketType::File => self.flush_writer(), _ => { - self.update_connection(); - self.encoder.update_header(self.name, self.id, &self.config); - self.flush_encoder(); + self.update_connection(&config); + self.encoder.update_header(self.name, self.id, &config); + self.flush_encoder(&config); } }, Err(Error::Terminated(..)) => { match socket_type { SocketType::File => self.flush_writer(), - _ => self.flush_encoder(), + _ => self.flush_encoder(&config), } break; } @@ -779,13 +816,17 @@ impl UniformSender { Ok(()) } - pub fn handle_target_server(&mut self, send_item: T) -> std::io::Result<()> { + pub fn handle_target_server( + &mut self, + send_item: T, + config: &SenderConfig, + ) -> std::io::Result<()> { self.encoder.cache_to_sender(send_item); if !self.cached || self.encoder.buffer_len() > Encoder::::BUFFER_LEN { self.check_or_register_counterable(self.encoder.header.msg_type); - self.update_connection(); - self.encoder.update_header(self.name, self.id, &self.config); - self.flush_encoder(); + self.update_connection(config); + self.encoder.update_header(self.name, self.id, config); + self.flush_encoder(config); } Ok(()) } diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 771b002f859..046e2da8711 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -125,6 +125,8 @@ use integration_skywalking::SkyWalkingExtra; use packet_sequence_block::BoxedPacketSequenceBlock; use pcap_assembler::{BoxedPcapBatch, PcapAssembler}; +#[cfg(feature = "enterprise")] +use enterprise_utils::utils::{kernel_version_check, ActionFlags}; use public::{ buffer::BatchedBox, debug::QueueDebugger, @@ -269,16 +271,6 @@ impl AgentState { } } - pub fn terminate(&self) { - if !self.terminated.swap(true, Ordering::Relaxed) { - // log only the first time - info!("Agent state changed to {:?}", State::Terminated); - } - let sg = self.state.lock().unwrap(); - self.notifier.notify_one(); - info!("Agent terminate with state: {:?}", State::from(sg.0)); - } - pub fn update_config(&self, config: ChangedConfig) { if self.terminated.load(Ordering::Relaxed) { // when state is Terminated, main thread should still be notified for exiting @@ -493,6 +485,7 @@ impl Trident { let ntp_diff = Arc::new(AtomicI64::new(0)); let stats_collector = Arc::new(stats::Collector::new(&hostname, ntp_diff.clone())); let exception_handler = ExceptionHandler::default(); + let sender_leaky_bucket = Arc::new(LeakyBucket::new(Some(0))); let log_stats_shared_connection = Arc::new(Mutex::new(Connection::new())); let mut stats_sender = UniformSenderThread::new( @@ -503,6 +496,7 @@ impl Trident { exception_handler.clone(), Some(log_stats_shared_connection.clone()), SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); stats_sender.start(); @@ -539,6 +533,7 @@ impl Trident { exception_handler.clone(), ntp_diff.clone(), log_stats_shared_connection, + sender_leaky_bucket.clone(), ); logger_writers.push(Box::new(remote_log_writer)); } @@ -605,6 +600,7 @@ impl Trident { sidecar_mode, cgroups_disabled, ntp_diff, + sender_leaky_bucket, ) { error!( "Launching deepflow-agent failed: {}, deepflow-agent restart...", @@ -625,6 +621,20 @@ impl Trident { Ok(Trident { state, handle }) } + #[cfg(feature = "enterprise")] + fn kernel_version_check(state: &AgentState, exception_handler: &ExceptionHandler) { + let action = kernel_version_check(); + if action.contains(ActionFlags::TERMINATE) { + exception_handler.set(Exception::KernelVersionCircuitBreaker); + crate::utils::clean_and_exit(1); + } else if action.contains(ActionFlags::MELTDOWN) { + exception_handler.set(Exception::KernelVersionCircuitBreaker); + state.melt_down(); + } else if action.contains(ActionFlags::ALARM) { + exception_handler.set(Exception::KernelVersionCircuitBreaker); + } + } + fn run( state: Arc, ctrl_ip: IpAddr, @@ -637,6 +647,7 @@ impl Trident { sidecar_mode: bool, cgroups_disabled: bool, ntp_diff: Arc, + sender_leaky_bucket: Arc, ) -> Result<()> { info!("==================== Launching DeepFlow-Agent ===================="); info!("Brief tag: {}", version_info.brief_tag()); @@ -889,6 +900,9 @@ impl Trident { platform_synchronizer.start(); } + #[cfg(feature = "enterprise")] + Trident::kernel_version_check(&state, &exception_handler); + let mut components: Option = None; let mut first_run = true; let mut config_initialized = false; @@ -1066,6 +1080,7 @@ impl Trident { gateway_vmac_addrs, config_handler.static_config.agent_mode, runtime.clone(), + sender_leaky_bucket.clone(), )?; comp.start(); @@ -1170,7 +1185,6 @@ impl Trident { } pub fn stop(&mut self) { - self.state.terminate(); info!("Agent stopping"); crate::utils::clean_and_exit(0); } @@ -2088,6 +2102,7 @@ impl AgentComponents { gateway_vmac_addrs: Vec, agent_mode: RunningMode, runtime: Arc, + sender_leaky_bucket: Arc, ) -> Result { let static_config = &config_handler.static_config; let candidate_config = &config_handler.candidate_config; @@ -2395,6 +2410,7 @@ impl AgentComponents { } else { SenderEncoder::Raw }, + sender_leaky_bucket.clone(), ); let metrics_queue_name = "3-doc-to-collector-sender"; @@ -2418,6 +2434,7 @@ impl AgentComponents { exception_handler.clone(), None, SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); let proto_log_queue_name = "2-protolog-to-collector-sender"; @@ -2445,6 +2462,7 @@ impl AgentComponents { } else { SenderEncoder::Raw }, + sender_leaky_bucket.clone(), ); let analyzer_ip = if candidate_config @@ -2513,6 +2531,7 @@ impl AgentComponents { exception_handler.clone(), Some(pcap_packet_shared_connection.clone()), SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); // Enterprise Edition Feature: packet-sequence let packet_sequence_queue_name = "2-packet-sequence-block-to-sender"; @@ -2539,6 +2558,7 @@ impl AgentComponents { exception_handler.clone(), Some(pcap_packet_shared_connection), SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); let bpf_builder = bpf::Builder { @@ -2655,6 +2675,7 @@ impl AgentComponents { exception_handler.clone(), None, SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); let profile_queue_name = "1-profile-to-sender"; @@ -2680,6 +2701,7 @@ impl AgentComponents { // profiler compress is a special one, it requires compressed and directly write into db // so we compress profile data inside and not compress secondly SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); let application_log_queue_name = "1-application-log-to-sender"; let (application_log_sender, application_log_receiver, counter) = queue::bounded_with_debug( @@ -2710,6 +2732,7 @@ impl AgentComponents { } else { SenderEncoder::Raw }, + sender_leaky_bucket.clone(), ); let skywalking_queue_name = "1-skywalking-to-sender"; @@ -2741,6 +2764,7 @@ impl AgentComponents { } else { SenderEncoder::Raw }, + sender_leaky_bucket.clone(), ); let datadog_queue_name = "1-datadog-to-sender"; @@ -2772,6 +2796,7 @@ impl AgentComponents { } else { SenderEncoder::Raw }, + sender_leaky_bucket.clone(), ); let ebpf_dispatcher_id = dispatcher_components.len(); @@ -2909,6 +2934,7 @@ impl AgentComponents { } else { SenderEncoder::Raw }, + sender_leaky_bucket.clone(), ); let otel_dispatcher_id = ebpf_dispatcher_id + 1; @@ -2968,6 +2994,7 @@ impl AgentComponents { exception_handler.clone(), Some(prometheus_telegraf_shared_connection.clone()), SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); let telegraf_queue_name = "1-telegraf-to-sender"; @@ -2995,6 +3022,7 @@ impl AgentComponents { exception_handler.clone(), Some(prometheus_telegraf_shared_connection), SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); let compressed_otel_queue_name = "1-compressed-otel-to-sender"; @@ -3022,6 +3050,7 @@ impl AgentComponents { exception_handler.clone(), None, SenderEncoder::Raw, + sender_leaky_bucket.clone(), ); let (external_metrics_server, external_metrics_counter) = MetricServer::new( @@ -3358,6 +3387,7 @@ impl Components { gateway_vmac_addrs: Vec, agent_mode: RunningMode, runtime: Arc, + sender_leaky_bucket: Arc, ) -> Result { #[cfg(target_os = "linux")] if crate::utils::environment::running_in_only_watch_k8s_mode() { @@ -3382,6 +3412,7 @@ impl Components { gateway_vmac_addrs, agent_mode, runtime, + sender_leaky_bucket, )?; return Ok(Components::Agent(components)); } diff --git a/agent/src/utils/environment.rs b/agent/src/utils/environment.rs index e361b47ac67..7da028de345 100644 --- a/agent/src/utils/environment.rs +++ b/agent/src/utils/environment.rs @@ -17,7 +17,7 @@ use std::{ cell::OnceCell, env::{self, VarError}, - fs, + fs, io, iter::Iterator, net::{IpAddr, Ipv4Addr, Ipv6Addr}, path::Path, @@ -26,6 +26,7 @@ use std::{ }; use bytesize::ByteSize; +use fs2::{free_space, total_space}; use log::{error, warn}; use sysinfo::{DiskExt, System, SystemExt}; @@ -425,3 +426,18 @@ pub fn get_ctrl_ip_and_mac(dest: &IpAddr) -> Result<(IpAddr, MacAddr)> { "failed getting control ip and mac, deepflow-agent restart...".to_owned(), )) } + +pub fn get_disk_usage(directory: &str) -> io::Result<(u64, u64)> { + let path = std::path::Path::new(directory); + if !path.exists() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("path does not exist: {}", path.display()), + )); + } + + let total = total_space(path)?; + let free = free_space(path)?; + + Ok((total, free)) +} diff --git a/agent/src/utils/guard.rs b/agent/src/utils/guard.rs index edce3317ae1..1debd23e7a0 100644 --- a/agent/src/utils/guard.rs +++ b/agent/src/utils/guard.rs @@ -46,6 +46,7 @@ use crate::config::handler::EnvironmentAccess; use crate::exception::ExceptionHandler; use crate::rpc::get_timestamp; use crate::trident::AgentState; +use crate::utils::environment::get_disk_usage; #[cfg(target_os = "linux")] use crate::utils::environment::SocketInfo; use crate::utils::{cgroups::is_kernel_available_for_cgroups, environment::running_in_container}; @@ -328,6 +329,40 @@ impl Guard { } } + fn check_free_disk( + percentage_trigger_threshold: u8, + absolute_trigger_threshold: u64, + directories: &Vec, + exception_handler: &ExceptionHandler, + ) { + if percentage_trigger_threshold == 0 && absolute_trigger_threshold == 0 { + return; + } + + for directory in directories { + match get_disk_usage(&directory) { + Ok((total, free)) => { + let free_percentage = free as f64 * 100.0 / total as f64; + if free_percentage < percentage_trigger_threshold as f64 + || free < absolute_trigger_threshold + { + exception_handler.set(Exception::FreeDiskCircuitBreaker); + return; + } + + if free_percentage > percentage_trigger_threshold as f64 * 1.1 + && free as f64 > absolute_trigger_threshold as f64 * 1.1 + { + exception_handler.clear(Exception::FreeDiskCircuitBreaker); + } + } + Err(e) => { + warn!("{}", e); + } + } + } + } + pub fn start(&self) { { let (running, _) = &*self.running; @@ -504,11 +539,22 @@ impl Guard { } } + if !in_container { + Self::check_free_disk(config.free_disk_circuit_breaker_percentage_threshold, config.free_disk_circuit_breaker_absolute_threshold, + &config.free_disk_circuit_breaker_directories, &exception_handler); + } + if exception_handler.has(Exception::SystemLoadCircuitBreaker) { - info!("Set the state to melt_down when the system load exceeds the threshold."); + warn!("Set the state to melt_down when the system load exceeds the threshold."); state.melt_down(); } else if exception_handler.has(Exception::FreeMemExceeded) { - info!("Set the state to melt_down when the free memory exceeds the threshold."); + warn!("Set the state to melt_down when the free memory exceeds the threshold."); + state.melt_down(); + } else if exception_handler.has(Exception::FreeDiskCircuitBreaker) { + warn!("Set the state to melt_down when the free disk exceeds the threshold."); + state.melt_down(); + } else if exception_handler.has(Exception::KernelVersionCircuitBreaker) { + warn!("Set the state to melt_down when the kernel version circuit breaker."); state.melt_down(); } else { state.recover(); diff --git a/agent/src/utils/logger.rs b/agent/src/utils/logger.rs index b5a9452bd69..52ead99e66c 100644 --- a/agent/src/utils/logger.rs +++ b/agent/src/utils/logger.rs @@ -29,6 +29,7 @@ use flexi_logger::{writers::LogWriter, DeferredNow, Level, Record}; use public::{ queue, sender::{SendMessageType, Sendable}, + LeakyBucket, }; use super::stats::{self, QueueStats}; @@ -107,6 +108,7 @@ impl RemoteLogWriter { exception_handler: ExceptionHandler, ntp_diff: Arc, shared_conn: Arc>, + leaky_bucket: Arc, ) -> Self { let module = "remote_logger"; let (sender, receiver, counter) = queue::bounded(Self::INNER_QUEUE_SIZE); @@ -125,6 +127,7 @@ impl RemoteLogWriter { exception_handler, Some(shared_conn), SenderEncoder::Raw, + leaky_bucket, ); uniform_sender.start(); Self { diff --git a/message/agent.proto b/message/agent.proto index fe07f6c26ba..253e383787c 100644 --- a/message/agent.proto +++ b/message/agent.proto @@ -77,6 +77,8 @@ enum Exception { CGROUPS_CONFIG_ERROR = 524288; SYSTEM_LOAD_CIRCUIT_BREAKER = 1048576; DATA_BPS_THRESHOLD_EXCEEDED = 2097152; + FREE_DISK_CIRCUIT_BREAKER = 4194304; + KERNEL_VERSION_CIRCUIT_BREAKER = 8388608; // 2^31及以下由 agent ,agent 最大可用异常是2^31,顺序从前往后 // 2^32及以上由控制器使用,顺序从后往前 } diff --git a/server/agent_config/README-CH.md b/server/agent_config/README-CH.md index b0b38633b68..15fa0f95e49 100644 --- a/server/agent_config/README-CH.md +++ b/server/agent_config/README-CH.md @@ -350,7 +350,7 @@ global: 观测内存比率是由 `global.circuit_breakers.sys_memory_percentage.metric` 决定. 1. 当系统`观测内存比率`低于 `trigger_threshold` * 70% 时, 采集器将自动重启。 -2. 当系统`观测内存比率低于 `trigger_threshold` 但高于 70% 时, +2. 当系统`观测内存比率`低于 `trigger_threshold` 但高于 70% 时, 采集器设置为 `FREE_MEM_EXCEEDED` 的异常状态,并上报采集器异常告警。 3. 当系统`观测内存比率`持续高于 `trigger_threshold` * 110% 时, 采集器将从异常状态恢复。 @@ -564,6 +564,103 @@ global: deepflow-agent 对流量分发所使用网络接口的出方向吞吐量指标的监控周期。 +### 空闲磁盘 {#global.circuit_breakers.free_disk} + +#### 百分比触发阈值 {#global.circuit_breakers.free_disk.percentage_trigger_threshold} + +**标签**: + +`hot_update` + +**FQCN**: + +`global.circuit_breakers.free_disk.percentage_trigger_threshold` + +**默认值**: +```yaml +global: + circuit_breakers: + free_disk: + percentage_trigger_threshold: 15 +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | % | +| Range | [0, 100] | + +**详细描述**: + +仅当采集器运行在非容器环境中时该配置有效。配置为 0 表示禁用该阈值。 +观测磁盘为`global.circuit_breakers.free_disk.directories`目录所在磁盘。 +1. 当系统`空闲磁盘比率`低于`该阈值`时,采集器进入熔断禁用状态, + 并设置`磁盘空闲空间触发熔断`异常状态,同时上报采集器异常告警。 +2. 当系统`空闲磁盘比率`高于`该阈值 * 110%` 时,采集器从异常状态恢复。 + +#### 绝对值触发阈值 {#global.circuit_breakers.free_disk.absolute_trigger_threshold} + +**标签**: + +`hot_update` + +**FQCN**: + +`global.circuit_breakers.free_disk.absolute_trigger_threshold` + +**默认值**: +```yaml +global: + circuit_breakers: + free_disk: + absolute_trigger_threshold: 10 +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | GB | +| Range | [0, 100000] | + +**详细描述**: + +仅当采集器运行在非容器环境中时该配置有效。配置为 0 表示禁用该阈值。 +观测磁盘为`global.circuit_breakers.free_disk.directories`目录所在磁盘。 +1. 当系统`空闲磁盘大小`低于`该阈值`时,采集器进入熔断禁用状态, + 并设置`磁盘空闲空间触发熔断`异常状态,同时上报采集器异常告警。 +2. 当系统`空闲磁盘大小`高于`该阈值 * 110%` 时,采集器从异常状态恢复。 + +#### 观测目录 {#global.circuit_breakers.free_disk.directories} + +**标签**: + +`hot_update` + +**FQCN**: + +`global.circuit_breakers.free_disk.directories` + +**默认值**: +```yaml +global: + circuit_breakers: + free_disk: + directories: + - / +``` + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string | + +**详细描述**: + +观测目录所在磁盘的空间。 +对于`windows`操作系统,默认值则是`c:\` + ## 调优 {#global.tunning} 对 deepflow-agent 的运行进行调优。 @@ -1085,9 +1182,43 @@ global: **详细描述**: 向 Server 端 Ingester 模块发送可观测性数据的最大允许流量, -超过此限速时数据将会主动丢弃、且采集器会标记为异常状态并触发告警。 +超限行为参考 `ingester_traffic_overflow_action` 配置描述。 配置为 0 表示不限速。 +### Ingester 流量超限的动作 {#global.communication.ingester_traffic_overflow_action} + +**标签**: + +`hot_update` + +**FQCN**: + +`global.communication.ingester_traffic_overflow_action` + +**默认值**: +```yaml +global: + communication: + ingester_traffic_overflow_action: WAIT +``` + +**枚举可选值**: +| Value | Note | +| ----- | ---------------------------- | +| WAIT | | +| DROP | | + +**模式**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string | + +**详细描述**: + +Ingester 流量超限的动作 +- WAIT:暂停发送,数据缓存到队列,等待下次发送。 +- DROP:直接丢弃数据,并触发 Agent `数据流量达到限速`异常。 + ### 请求 NAT IP 地址 {#global.communication.request_via_nat_ip} **标签**: diff --git a/server/agent_config/README.md b/server/agent_config/README.md index 484d96f462b..97724bea223 100644 --- a/server/agent_config/README.md +++ b/server/agent_config/README.md @@ -571,6 +571,103 @@ global: Monitoring interval for outbound traffic rate of NPB interface. +### Free Disk {#global.circuit_breakers.free_disk} + +#### Percentage Trigger Threshold {#global.circuit_breakers.free_disk.percentage_trigger_threshold} + +**Tags**: + +`hot_update` + +**FQCN**: + +`global.circuit_breakers.free_disk.percentage_trigger_threshold` + +**Default value**: +```yaml +global: + circuit_breakers: + free_disk: + percentage_trigger_threshold: 15 +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | % | +| Range | [0, 100] | + +**Description**: + +This configuration is only valid when the Agent runs in a non-container environment. Configuring to 0 means disabling the threshold. +The observed disks are the disks where the `global.circuit_breakers.free_disk.directories` are located. +1. When the system `free disk ratio` is lower than `this threshold`, the Agent enters the fuse disabled state, + and sets the `FREE_DISK_CIRCUIT_BREAKER` abnormal state, and reports the Agent abnormal alarm. +2. When the system `free disk ratio` is higher than `this threshold * 110%`, the Agent recovers from the abnormal state. + +#### Absolute_Trigger Threshold {#global.circuit_breakers.free_disk.absolute_trigger_threshold} + +**Tags**: + +`hot_update` + +**FQCN**: + +`global.circuit_breakers.free_disk.absolute_trigger_threshold` + +**Default value**: +```yaml +global: + circuit_breakers: + free_disk: + absolute_trigger_threshold: 10 +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | int | +| Unit | GB | +| Range | [0, 100000] | + +**Description**: + +This configuration is only valid when the Agent runs in a non-container environment. Configuring to 0 means disabling the threshold. +The observed disks are the disks where the `global.circuit_breakers.free_disk.directories` is located. +1. When the system `free disk size` is lower than `this threshold`, the Agent enters the fuse disabled state, + and sets the `FREE_DISK_CIRCUIT_BREAKER` abnormal state, and reports the Agent abnormal alarm. +2. When the system `free disk size` is higher than `this threshold * 110%`, the Agent recovers from the abnormal state. + +#### Directories {#global.circuit_breakers.free_disk.directories} + +**Tags**: + +`hot_update` + +**FQCN**: + +`global.circuit_breakers.free_disk.directories` + +**Default value**: +```yaml +global: + circuit_breakers: + free_disk: + directories: + - / +``` + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string | + +**Description**: + +Observe the disk space where the directories is located. +For the `windows` operating system, the default value is `c:\`. + ## Tunning {#global.tunning} Tune the runtime of deepflow-agent. @@ -1109,10 +1206,43 @@ global: **Description**: The maximum allowed flow rate for sending observability data to the server-side Ingester module. -When this rate limit is exceeded, the data will be actively discarded, -and the agent will be marked as abnormal and trigger an alarm. +For the overflow action, refer to the `ingester_traffic_overflow_action` configuration description. Setting it to 0 means no speed limit. +### Action when the Ingester traffic exceeds the limit {#global.communication.ingester_traffic_overflow_action} + +**Tags**: + +`hot_update` + +**FQCN**: + +`global.communication.ingester_traffic_overflow_action` + +**Default value**: +```yaml +global: + communication: + ingester_traffic_overflow_action: WAIT +``` + +**Enum options**: +| Value | Note | +| ----- | ---------------------------- | +| WAIT | | +| DROP | | + +**Schema**: +| Key | Value | +| ---- | ---------------------------- | +| Type | string | + +**Description**: + +Action when the Ingester traffic exceeds the limit +- WAIT: pause sending, cache data into queue, and wait for next sending +- DROP: the data is discarded directly and the Agent `DATA_BPS_THRESHOLD_EXCEEDED` exception is triggered + ### Request via NAT IP Address {#global.communication.request_via_nat_ip} **Tags**: diff --git a/server/agent_config/template.yaml b/server/agent_config/template.yaml index cbe5b4888d0..8bb195d8e7e 100644 --- a/server/agent_config/template.yaml +++ b/server/agent_config/template.yaml @@ -240,7 +240,7 @@ global: # 观测内存比率是由 `global.circuit_breakers.sys_memory_percentage.metric` 决定. # 1. 当系统`观测内存比率`低于 `trigger_threshold` * 70% 时, # 采集器将自动重启。 - # 2. 当系统`观测内存比率低于 `trigger_threshold` 但高于 70% 时, + # 2. 当系统`观测内存比率`低于 `trigger_threshold` 但高于 70% 时, # 采集器设置为 `FREE_MEM_EXCEEDED` 的异常状态,并上报采集器异常告警。 # 3. 当系统`观测内存比率`持续高于 `trigger_threshold` * 110% 时, # 采集器将从异常状态恢复。 @@ -382,6 +382,76 @@ global: # deepflow-agent 对流量分发所使用网络接口的出方向吞吐量指标的监控周期。 # upgrade_from: bandwidth_probe_interval throughput_monitoring_interval: 10s + # type: section + # name: + # en: Free Disk + # ch: 空闲磁盘 + # description: + free_disk: + # type: int + # name: + # en: Percentage Trigger Threshold + # ch: 百分比触发阈值 + # unit: '%' + # range: [0, 100] + # enum_options: [] + # modification: hot_update + # ee_feature: false + # description: + # en: |- + # This configuration is only valid when the Agent runs in a non-container environment. Configuring to 0 means disabling the threshold. + # The observed disks are the disks where the `global.circuit_breakers.free_disk.directories` are located. + # 1. When the system `free disk ratio` is lower than `this threshold`, the Agent enters the fuse disabled state, + # and sets the `FREE_DISK_CIRCUIT_BREAKER` abnormal state, and reports the Agent abnormal alarm. + # 2. When the system `free disk ratio` is higher than `this threshold * 110%`, the Agent recovers from the abnormal state. + # ch: |- + # 仅当采集器运行在非容器环境中时该配置有效。配置为 0 表示禁用该阈值。 + # 观测磁盘为`global.circuit_breakers.free_disk.directories`目录所在磁盘。 + # 1. 当系统`空闲磁盘比率`低于`该阈值`时,采集器进入熔断禁用状态, + # 并设置`磁盘空闲空间触发熔断`异常状态,同时上报采集器异常告警。 + # 2. 当系统`空闲磁盘比率`高于`该阈值 * 110%` 时,采集器从异常状态恢复。 + percentage_trigger_threshold: 15 + # type: int + # name: + # en: Absolute_Trigger Threshold + # ch: 绝对值触发阈值 + # unit: GB + # range: [0, 100000] + # enum_options: [] + # modification: hot_update + # ee_feature: false + # description: + # en: |- + # This configuration is only valid when the Agent runs in a non-container environment. Configuring to 0 means disabling the threshold. + # The observed disks are the disks where the `global.circuit_breakers.free_disk.directories` is located. + # 1. When the system `free disk size` is lower than `this threshold`, the Agent enters the fuse disabled state, + # and sets the `FREE_DISK_CIRCUIT_BREAKER` abnormal state, and reports the Agent abnormal alarm. + # 2. When the system `free disk size` is higher than `this threshold * 110%`, the Agent recovers from the abnormal state. + # ch: |- + # 仅当采集器运行在非容器环境中时该配置有效。配置为 0 表示禁用该阈值。 + # 观测磁盘为`global.circuit_breakers.free_disk.directories`目录所在磁盘。 + # 1. 当系统`空闲磁盘大小`低于`该阈值`时,采集器进入熔断禁用状态, + # 并设置`磁盘空闲空间触发熔断`异常状态,同时上报采集器异常告警。 + # 2. 当系统`空闲磁盘大小`高于`该阈值 * 110%` 时,采集器从异常状态恢复。 + absolute_trigger_threshold: 10 + # type: string + # name: + # en: Directories + # ch: 观测目录 + # unit: + # range: [] + # enum_options: [] + # modification: hot_update + # ee_feature: false + # description: + # en: |- + # Observe the disk space where the directories is located. + # For the `windows` operating system, the default value is `c:\`. + # ch: |- + # 观测目录所在磁盘的空间。 + # 对于`windows`操作系统,默认值则是`c:\` + directories: [/] + # type: section # name: # en: Tunning @@ -738,14 +808,32 @@ global: # description: # en: |- # The maximum allowed flow rate for sending observability data to the server-side Ingester module. - # When this rate limit is exceeded, the data will be actively discarded, - # and the agent will be marked as abnormal and trigger an alarm. + # For the overflow action, refer to the `ingester_traffic_overflow_action` configuration description. # Setting it to 0 means no speed limit. # ch: |- # 向 Server 端 Ingester 模块发送可观测性数据的最大允许流量, - # 超过此限速时数据将会主动丢弃、且采集器会标记为异常状态并触发告警。 + # 超限行为参考 `ingester_traffic_overflow_action` 配置描述。 # 配置为 0 表示不限速。 max_throughput_to_ingester: 100 + # type: string + # name: + # en: Action when the Ingester traffic exceeds the limit + # ch: Ingester 流量超限的动作 + # unit: + # range: [] + # enum_options: [WAIT, DROP] + # modification: hot_update + # ee_feature: false + # description: + # en: |- + # Action when the Ingester traffic exceeds the limit + # - WAIT: pause sending, cache data into queue, and wait for next sending + # - DROP: the data is discarded directly and the Agent `DATA_BPS_THRESHOLD_EXCEEDED` exception is triggered + # ch: |- + # Ingester 流量超限的动作 + # - WAIT:暂停发送,数据缓存到队列,等待下次发送。 + # - DROP:直接丢弃数据,并触发 Agent `数据流量达到限速`异常。 + ingester_traffic_overflow_action: WAIT # type: bool # name: # en: Request via NAT IP Address diff --git a/server/controller/common/const.go b/server/controller/common/const.go index 640c3a400d8..5f2ce273f4d 100644 --- a/server/controller/common/const.go +++ b/server/controller/common/const.go @@ -199,6 +199,8 @@ var VTapExceptionChinese = map[int64]string{ 1 << 19: "CGROUPS配置错误", 1 << 20: "系统负载超限触发熔断", 1 << 21: "数据流量达到限速", + 1 << 22: "磁盘空闲空间触发熔断", + 1 << 23: "内核版本触发熔断", VTAP_EXCEPTION_LICENSE_NOT_ENGOUTH: "采集器授权个数不足", VTAP_EXCEPTION_ALLOC_ANALYZER_FAILED: "分配数据节点失败", VTAP_EXCEPTION_ALLOC_CONTROLLER_FAILED: "分配控制器失败",