Skip to content

Commit 7a9f759

Browse files
committed
feat: add free disk circuit breaker
1 parent c596554 commit 7a9f759

File tree

12 files changed

+511
-10
lines changed

12 files changed

+511
-10
lines changed

agent/Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ enum_dispatch = "0.3.7"
4848
envmnt = "0.10.4"
4949
flate2 = "1.0"
5050
flexi_logger = "0.29"
51+
fs2 = "0.4"
5152
futures = "~0.3"
5253
grpc = { path = "plugins/grpc" }
5354
hex = "0.4.3"

agent/src/config/config.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2079,12 +2079,35 @@ impl Default for TxThroughput {
20792079
}
20802080
}
20812081

2082-
#[derive(Clone, Copy, Default, Debug, Deserialize, PartialEq, Eq)]
2082+
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
2083+
#[serde(default)]
2084+
pub struct FreeDisk {
2085+
pub percentage_trigger_threshold: u8,
2086+
#[serde(deserialize_with = "deser_u64_with_giga_unit")]
2087+
pub absolute_trigger_threshold: u64,
2088+
pub directories: Vec<String>,
2089+
}
2090+
2091+
impl Default for FreeDisk {
2092+
fn default() -> Self {
2093+
Self {
2094+
percentage_trigger_threshold: 15,
2095+
absolute_trigger_threshold: 10 << 30,
2096+
#[cfg(not(target_os = "windows"))]
2097+
directories: vec!["/".to_string()],
2098+
#[cfg(target_os = "windows")]
2099+
directories: vec!["c:\\".to_string()],
2100+
}
2101+
}
2102+
}
2103+
2104+
#[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)]
20832105
#[serde(default)]
20842106
pub struct CircuitBreakers {
20852107
pub sys_memory_percentage: SysMemoryPercentage,
20862108
pub relative_sys_load: RelativeSysLoad,
20872109
pub tx_throughput: TxThroughput,
2110+
pub free_disk: FreeDisk,
20882111
}
20892112

20902113
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
@@ -3329,6 +3352,13 @@ where
33293352
Ok(usize::deserialize(deserializer)? << 20)
33303353
}
33313354

3355+
fn deser_u64_with_giga_unit<'de, D>(deserializer: D) -> Result<u64, D::Error>
3356+
where
3357+
D: Deserializer<'de>,
3358+
{
3359+
Ok(u64::deserialize(deserializer)? << 30)
3360+
}
3361+
33323362
// `humantime` will not parse "0" as Duration::ZERO
33333363
// If "0" is a valid option for a duration field, use this deserializer
33343364
// #[serde(deserialize_with = "deser_humantime_with_zero")]

agent/src/config/handler.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ pub struct EnvironmentConfig {
218218
pub system_load_circuit_breaker_recover: f32,
219219
pub system_load_circuit_breaker_metric: agent::SystemLoadMetric,
220220
pub page_cache_reclaim_percentage: u8,
221+
pub free_disk_circuit_breaker_percentage_threshold: u8,
222+
pub free_disk_circuit_breaker_absolute_threshold: u64,
223+
pub free_disk_circuit_breaker_directories: Vec<String>,
221224
}
222225

223226
#[derive(Clone, PartialEq, Eq, Debug)]
@@ -1665,6 +1668,22 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
16651668
.relative_sys_load
16661669
.metric,
16671670
page_cache_reclaim_percentage: conf.global.tunning.page_cache_reclaim_percentage,
1671+
free_disk_circuit_breaker_percentage_threshold: conf
1672+
.global
1673+
.circuit_breakers
1674+
.free_disk
1675+
.percentage_trigger_threshold,
1676+
free_disk_circuit_breaker_absolute_threshold: conf
1677+
.global
1678+
.circuit_breakers
1679+
.free_disk
1680+
.absolute_trigger_threshold,
1681+
free_disk_circuit_breaker_directories: {
1682+
let mut v = conf.global.circuit_breakers.free_disk.directories.clone();
1683+
v.sort();
1684+
v.dedup();
1685+
v
1686+
},
16681687
},
16691688
synchronizer: SynchronizerConfig {
16701689
sync_interval: conf.global.communication.proactive_request_interval,
@@ -4011,6 +4030,34 @@ impl ConfigHandler {
40114030
limits.max_sockets_tolerate_interval = new_limits.max_sockets_tolerate_interval;
40124031
}
40134032

4033+
let free_disk = &mut config.global.circuit_breakers.free_disk;
4034+
let new_free_disk = &mut new_config.user_config.global.circuit_breakers.free_disk;
4035+
if free_disk.percentage_trigger_threshold != new_free_disk.percentage_trigger_threshold {
4036+
info!(
4037+
"Update global.circuit_breakers.free_disk.percentage_trigger_threshold from {:?} to {:?}.",
4038+
free_disk.percentage_trigger_threshold, new_free_disk.percentage_trigger_threshold
4039+
);
4040+
free_disk.percentage_trigger_threshold = new_free_disk.percentage_trigger_threshold;
4041+
}
4042+
4043+
if free_disk.absolute_trigger_threshold != new_free_disk.absolute_trigger_threshold {
4044+
info!(
4045+
"Update global.circuit_breakers.free_disk.absolute_trigger_threshold from {:?} to {:?}.",
4046+
free_disk.absolute_trigger_threshold, new_free_disk.absolute_trigger_threshold
4047+
);
4048+
free_disk.absolute_trigger_threshold = new_free_disk.absolute_trigger_threshold;
4049+
}
4050+
4051+
new_free_disk.directories.sort();
4052+
new_free_disk.directories.dedup();
4053+
if free_disk.directories != new_free_disk.directories {
4054+
info!(
4055+
"Update global.circuit_breakers.free_disk.directories from {:?} to {:?}.",
4056+
free_disk.directories, new_free_disk.directories
4057+
);
4058+
free_disk.directories = new_free_disk.directories.clone();
4059+
}
4060+
40144061
let ntp = &mut config.global.ntp;
40154062
let new_ntp = &mut new_config.user_config.global.ntp;
40164063
if ntp.enabled != new_ntp.enabled {

agent/src/monitor.rs

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use sysinfo::NetworkExt;
3030
use sysinfo::{get_current_pid, Pid, ProcessExt, ProcessRefreshKind, System, SystemExt};
3131

3232
#[cfg(target_os = "linux")]
33-
use crate::utils::{cgroups, environment::SocketInfo};
33+
use crate::utils::{
34+
cgroups,
35+
environment::{get_disk_usage, SocketInfo},
36+
};
3437
use crate::{
3538
config::handler::EnvironmentAccess,
3639
error::{Error, Result},
@@ -227,7 +230,8 @@ impl RefCountable for SysStatusBroker {
227230
CounterValue::Unsigned(current_sys_available_memory_percentage as u64),
228231
));
229232

230-
let sys_memory_limit = self.config.load().sys_memory_limit as f64;
233+
let config = self.config.load();
234+
let sys_memory_limit = config.sys_memory_limit as f64;
231235

232236
let (sys_free_memory_limit_ratio, sys_available_memory_limit_ratio) =
233237
if sys_memory_limit > 0.0 {
@@ -266,6 +270,7 @@ impl RefCountable for SysStatusBroker {
266270
warn!("get file and size sum failed: {:?}", e);
267271
}
268272
}
273+
269274
match system_guard.process(self.pid) {
270275
Some(process) => {
271276
let cpu_usage = process.cpu_usage() as f64;
@@ -279,7 +284,7 @@ impl RefCountable for SysStatusBroker {
279284
metrics.push((
280285
"max_millicpus_ratio",
281286
CounterType::Gauged,
282-
CounterValue::Float(cpu_usage * 10.0 / self.config.load().max_millicpus as f64),
287+
CounterValue::Float(cpu_usage * 10.0 / config.max_millicpus as f64),
283288
));
284289
metrics.push((
285290
"memory",
@@ -289,7 +294,7 @@ impl RefCountable for SysStatusBroker {
289294
metrics.push((
290295
"max_memory_ratio",
291296
CounterType::Gauged,
292-
CounterValue::Float(mem_used as f64 / self.config.load().max_memory as f64),
297+
CounterValue::Float(mem_used as f64 / config.max_memory as f64),
293298
));
294299
metrics.push((
295300
"create_time",
@@ -373,13 +378,54 @@ impl stats::Module for NetStats<'_> {
373378
}
374379
}
375380

381+
struct FreeDiskUsage {
382+
directory: String,
383+
}
384+
385+
impl stats::Module for FreeDiskUsage {
386+
fn name(&self) -> &'static str {
387+
"free_disk"
388+
}
389+
390+
fn tags(&self) -> Vec<StatsOption> {
391+
vec![StatsOption::Tag("directory", self.directory.clone())]
392+
}
393+
}
394+
395+
impl RefCountable for FreeDiskUsage {
396+
fn get_counters(&self) -> Vec<Counter> {
397+
let mut metrics = vec![];
398+
match get_disk_usage(&self.directory) {
399+
Ok((total, free)) => {
400+
metrics.push((
401+
"free_disk_percentage",
402+
CounterType::Gauged,
403+
CounterValue::Float(free as f64 * 100.0 / total as f64),
404+
));
405+
metrics.push((
406+
"free_disk_absolute",
407+
CounterType::Gauged,
408+
CounterValue::Unsigned(free as u64),
409+
));
410+
}
411+
Err(e) => {
412+
warn!("get disk free usage failed: {:?}", e);
413+
}
414+
}
415+
metrics
416+
}
417+
}
418+
376419
pub struct Monitor {
377420
stats: Arc<Collector>,
378421
running: AtomicBool,
379422
sys_monitor: Arc<SysStatusBroker>,
380423
sys_load: Arc<SysLoad>,
381424
link_map: Arc<Mutex<HashMap<String, Arc<LinkStatusBroker>>>>,
382425
system: Arc<Mutex<System>>,
426+
config: EnvironmentAccess,
427+
free_disks_config: Arc<Mutex<Vec<String>>>,
428+
free_disk_counters: Arc<Mutex<Vec<Arc<FreeDiskUsage>>>>,
383429
}
384430

385431
impl Monitor {
@@ -399,6 +445,9 @@ impl Monitor {
399445
sys_load: Arc::new(SysLoad(system.clone())),
400446
link_map: Arc::new(Mutex::new(HashMap::new())),
401447
system,
448+
config: config.clone(),
449+
free_disks_config: Arc::new(Mutex::new(vec![])),
450+
free_disk_counters: Arc::new(Mutex::new(vec![])),
402451
})
403452
}
404453

@@ -519,6 +568,42 @@ impl Monitor {
519568
Countable::Ref(Arc::downgrade(&self.sys_load) as Weak<dyn RefCountable>),
520569
);
521570

571+
let config = self.config.clone();
572+
let stats_collector = self.stats.clone();
573+
let free_disks_config = self.free_disks_config.clone();
574+
let free_disk_counters = self.free_disk_counters.clone();
575+
self.stats.register_pre_hook(Box::new(move || {
576+
let config_load = config.load();
577+
let mut free_disks_config = free_disks_config.lock().unwrap();
578+
if config_load.free_disk_circuit_breaker_directories == *free_disks_config {
579+
return;
580+
}
581+
582+
let mut locked_counters = free_disk_counters.lock().unwrap();
583+
let old_data = std::mem::take(&mut *locked_counters);
584+
stats_collector
585+
.deregister_countables(old_data.iter().map(|c| c.as_ref() as &dyn stats::Module));
586+
587+
for free_disk in &config_load.free_disk_circuit_breaker_directories {
588+
let free_disk_counter = Arc::new(FreeDiskUsage {
589+
directory: free_disk.clone(),
590+
});
591+
stats_collector.register_countable(
592+
&FreeDiskUsage {
593+
directory: free_disk.clone(),
594+
},
595+
Countable::Ref(Arc::downgrade(&free_disk_counter) as Weak<dyn RefCountable>),
596+
);
597+
locked_counters.push(free_disk_counter);
598+
}
599+
600+
info!(
601+
"update free disk monitor from {:?} to {:?}",
602+
free_disks_config, config_load.free_disk_circuit_breaker_directories
603+
);
604+
*free_disks_config = config_load.free_disk_circuit_breaker_directories.clone();
605+
}));
606+
522607
info!("monitor started");
523608
}
524609

agent/src/utils/environment.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use std::{
1818
cell::OnceCell,
1919
env::{self, VarError},
20-
fs,
20+
fs, io,
2121
iter::Iterator,
2222
net::{IpAddr, Ipv4Addr, Ipv6Addr},
2323
path::Path,
@@ -26,6 +26,7 @@ use std::{
2626
};
2727

2828
use bytesize::ByteSize;
29+
use fs2::{free_space, total_space};
2930
use log::{error, warn};
3031
use sysinfo::{DiskExt, System, SystemExt};
3132

@@ -425,3 +426,18 @@ pub fn get_ctrl_ip_and_mac(dest: &IpAddr) -> Result<(IpAddr, MacAddr)> {
425426
"failed getting control ip and mac, deepflow-agent restart...".to_owned(),
426427
))
427428
}
429+
430+
pub fn get_disk_usage(directory: &str) -> io::Result<(u64, u64)> {
431+
let path = std::path::Path::new(directory);
432+
if !path.exists() {
433+
return Err(io::Error::new(
434+
io::ErrorKind::NotFound,
435+
format!("path does not exist: {}", path.display()),
436+
));
437+
}
438+
439+
let total = total_space(path)?;
440+
let free = free_space(path)?;
441+
442+
Ok((total, free))
443+
}

0 commit comments

Comments
 (0)