From 25e372998324b11063707d368e7d9d23c8d0cec6 Mon Sep 17 00:00:00 2001 From: yuanchao Date: Mon, 23 Jun 2025 16:48:04 +0800 Subject: [PATCH] fix: reduce the frequency of log printing --- agent/benches/labeler.rs | 6 +- agent/src/common/mod.rs | 2 + agent/src/debug/rpc.rs | 8 +- agent/src/dispatcher/mod.rs | 2 + agent/src/ebpf_dispatcher.rs | 2 + agent/src/policy/first_path.rs | 24 ++- agent/src/policy/labeler.rs | 39 ++-- agent/src/policy/policy.rs | 54 +++-- agent/src/rpc/synchronizer.rs | 254 ++++++++++++++++++----- agent/src/utils/npb_bandwidth_watcher.rs | 2 + 10 files changed, 303 insertions(+), 90 deletions(-) diff --git a/agent/benches/labeler.rs b/agent/benches/labeler.rs index f904839f567..9a9aec50f88 100644 --- a/agent/benches/labeler.rs +++ b/agent/benches/labeler.rs @@ -70,7 +70,7 @@ fn bench_labeler(c: &mut Criterion) { cidr_list.push(Arc::new(cidr)); } - labeler.update_cidr_table(&cidr_list); + labeler.update_cidr_table(&cidr_list, false, &mut false); labeler.update_interface_table(&iface_list); let key: LookupKey = LookupKey { @@ -118,7 +118,7 @@ fn bench_labeler(c: &mut Criterion) { cidr_list.push(Arc::new(cidr)); } - labeler.update_cidr_table(&cidr_list); + labeler.update_cidr_table(&cidr_list, false, &mut false); labeler.update_interface_table(&iface_list); let key: LookupKey = LookupKey { @@ -163,7 +163,7 @@ fn bench_policy(c: &mut Criterion) { Arc::new(IpGroupData::new(10, 2, "192.168.2.1/32")), Arc::new(IpGroupData::new(20, 20, "192.168.2.5/31")), ]); - let _ = first.update_acl(&vec![Arc::new(acl)], true); + let _ = first.update_acl(&vec![Arc::new(acl)], true, false, &mut false); first } diff --git a/agent/src/common/mod.rs b/agent/src/common/mod.rs index 901c7252dc2..69522dc6c69 100644 --- a/agent/src/common/mod.rs +++ b/agent/src/common/mod.rs @@ -100,6 +100,8 @@ pub trait FlowAclListener: Send + Sync { peers: &Vec>, cidrs: &Vec>, acls: &Vec>, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, ) -> Result<(), String>; fn containers_change(&mut self, _: &Vec>) {} fn id(&self) -> usize; diff --git a/agent/src/debug/rpc.rs b/agent/src/debug/rpc.rs index 716bc93edb2..f0879998e03 100644 --- a/agent/src/debug/rpc.rs +++ b/agent/src/debug/rpc.rs @@ -156,7 +156,7 @@ impl RpcDebugger { } let mut sg = self.status.write(); - sg.get_platform_data(&resp); + sg.get_platform_data(&resp, false); let mut res = sg .cidrs .iter() @@ -181,7 +181,7 @@ impl RpcDebugger { } let mut sg = self.status.write(); - sg.get_platform_data(&resp); + sg.get_platform_data(&resp, false); let mut res = sg .interfaces .iter() @@ -211,7 +211,7 @@ impl RpcDebugger { } let mut sg = self.status.write(); - sg.get_ip_groups(&resp); + sg.get_ip_groups(&resp, false); let mut res = sg .ip_groups .iter() @@ -236,7 +236,7 @@ impl RpcDebugger { } let mut sg = self.status.write(); - sg.get_flow_acls(&resp); + sg.get_flow_acls(&resp, false); let mut res = sg .acls .iter() diff --git a/agent/src/dispatcher/mod.rs b/agent/src/dispatcher/mod.rs index d17830f8d2d..5446ee8c646 100644 --- a/agent/src/dispatcher/mod.rs +++ b/agent/src/dispatcher/mod.rs @@ -255,6 +255,8 @@ impl FlowAclListener for DispatcherListener { _: &Vec>, _: &Vec>, _: &Vec>, + _: bool, + _: &mut bool, ) -> Result<(), String> { match self { DispatcherListener::Local(a) => a.flow_acl_change(), diff --git a/agent/src/ebpf_dispatcher.rs b/agent/src/ebpf_dispatcher.rs index a312b9b4bbf..435a0116a72 100644 --- a/agent/src/ebpf_dispatcher.rs +++ b/agent/src/ebpf_dispatcher.rs @@ -496,6 +496,8 @@ impl FlowAclListener for SyncEbpfDispatcher { _: &Vec>, _: &Vec>, _: &Vec>, + _: bool, + _: &mut bool, ) -> Result<(), String> { self.pause.store(false, Ordering::Relaxed); Ok(()) diff --git a/agent/src/policy/first_path.rs b/agent/src/policy/first_path.rs index b5ef2d160ee..869cf69539f 100644 --- a/agent/src/policy/first_path.rs +++ b/agent/src/policy/first_path.rs @@ -340,7 +340,6 @@ impl FirstPath { } if self.group_ip_map.is_none() { - warn!("IpGroup is nil, invalid acl: {}", acl); return false; } @@ -352,7 +351,6 @@ impl FirstPath { .get(&(*group as u16)) .is_none() { - warn!("Invalid acl by src group({}): {}", group, acl); return true; } } @@ -365,7 +363,6 @@ impl FirstPath { .get(&(*group as u16)) .is_none() { - warn!("Invalid acl by dst group({}): {}", group, acl); return true; } } @@ -593,12 +590,22 @@ impl FirstPath { Ok(()) } - pub fn update_acl(&mut self, acls: &Vec>, check: bool) -> PResult<()> { + pub fn update_acl( + &mut self, + acls: &Vec>, + check: bool, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, + ) -> PResult<()> { if !NOT_SUPPORT { let mut valid_acls = Vec::new(); + let mut invalid_acls = vec![]; for acl in acls { if self.is_invalid_acl(acl, check) { + if enabled_invalid_log { + invalid_acls.push(acl.id); + } continue; } let mut valid_acl = (**acl).clone(); @@ -606,6 +613,15 @@ impl FirstPath { valid_acl.reset(); valid_acls.push(valid_acl); } + + if enabled_invalid_log && !invalid_acls.is_empty() { + warn!( + "Invalid acls: {:?}, maybe the IP resource group doesn't have an IP address.", + invalid_acls + ); + *has_invalid_log = true; + } + self.generate_first_table(&mut valid_acls)?; } diff --git a/agent/src/policy/labeler.rs b/agent/src/policy/labeler.rs index c34458f93c6..62484e75bb5 100644 --- a/agent/src/policy/labeler.rs +++ b/agent/src/policy/labeler.rs @@ -259,10 +259,16 @@ impl Labeler { return (0, 0); } - pub fn update_cidr_table(&mut self, cidrs: &Vec>) { + pub fn update_cidr_table( + &mut self, + cidrs: &Vec>, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, + ) { let mut masklen_table: AHashMap = AHashMap::new(); let mut epc_table: AHashMap> = AHashMap::new(); let mut tunnel_table: AHashMap>> = AHashMap::new(); + let mut invalid_cidr = Vec::new(); for item in cidrs { let mut epc_id = item.epc_id; @@ -272,13 +278,11 @@ impl Labeler { let key = EpcNetIpKey::new(&item.ip.network(), item.ip.prefix_len(), epc_id); if let Some(old) = epc_table.insert(key, item.clone()) { - if (item.cidr_type == CidrType::Wan && item.epc_id != old.epc_id) - || item.is_vip != old.is_vip + if enabled_invalid_log + && ((item.cidr_type == CidrType::Wan && item.epc_id != old.epc_id) + || item.is_vip != old.is_vip) { - warn!( - "Found the same cidr, please check {:?} and {:?}.", - item, old - ); + invalid_cidr.push(item.ip) } } masklen_table @@ -301,6 +305,11 @@ impl Labeler { } } + if enabled_invalid_log && !invalid_cidr.is_empty() { + warn!("Invalid same cidr: {:?}", invalid_cidr); + *has_invalid_log = true; + } + // 排序使用降序是为了CIDR的最长前缀匹配 for (_k, v) in &mut tunnel_table.iter_mut() { v.sort_by(|a, b| { @@ -1095,7 +1104,7 @@ mod tests { let cidrs = vec![Arc::new(cidr1), Arc::new(cidr2), Arc::new(cidr3)]; let mut endpoint: EndpointInfo = Default::default(); - labeler.update_cidr_table(&cidrs); + labeler.update_cidr_table(&cidrs, false, &mut false); labeler.set_epc_by_cidr("192.168.10.100".parse().unwrap(), 10, &mut endpoint); assert_eq!(endpoint.is_vip, true); @@ -1117,7 +1126,7 @@ mod tests { ..Default::default() }; let cidrs = vec![Arc::new(cidr1), Arc::new(cidr2)]; - labeler.update_cidr_table(&cidrs); + labeler.update_cidr_table(&cidrs, false, &mut false); let mut endpoint: EndpointInfo = Default::default(); labeler.set_epc_by_cidr("10.1.2.3".parse().unwrap(), 10, &mut endpoint); @@ -1136,7 +1145,7 @@ mod tests { let mut endpoint: EndpointInfo = Default::default(); - labeler.update_cidr_table(&vec![Arc::new(cidr1)]); + labeler.update_cidr_table(&vec![Arc::new(cidr1)], false, &mut false); labeler.set_epc_by_cidr("192.168.10.100".parse().unwrap(), 10, &mut endpoint); assert_eq!(endpoint.l3_epc_id, 0); @@ -1160,7 +1169,7 @@ mod tests { let mut endpoint: EndpointInfo = Default::default(); - labeler.update_cidr_table(&vec![Arc::new(cidr1)]); + labeler.update_cidr_table(&vec![Arc::new(cidr1)], false, &mut false); labeler.set_epc_vip_by_tunnel("192.168.10.100".parse().unwrap(), 10, &mut endpoint); assert_eq!(endpoint.l3_epc_id, 10); @@ -1178,7 +1187,7 @@ mod tests { let mut endpoint: EndpointInfo = Default::default(); - labeler.update_cidr_table(&vec![Arc::new(cidr1)]); + labeler.update_cidr_table(&vec![Arc::new(cidr1)], false, &mut false); labeler.set_vip_by_cidr("192.168.10.100".parse().unwrap(), 10, &mut endpoint); assert_eq!(endpoint.is_vip, true); @@ -1220,7 +1229,7 @@ mod tests { labeler.update_mac_table(&list); labeler.update_epc_ip_table(&list); - labeler.update_cidr_table(&vec![Arc::new(cidr1)]); + labeler.update_cidr_table(&vec![Arc::new(cidr1)], false, &mut false); let key: LookupKey = LookupKey { src_mac: MacAddr::from_str("11:22:33:44:55:66").unwrap(), @@ -1260,7 +1269,7 @@ mod tests { ..Default::default() }; labeler.update_mac_table(&vec![Arc::new(interface)]); - labeler.update_cidr_table(&vec![Arc::new(cidr)]); + labeler.update_cidr_table(&vec![Arc::new(cidr)], false, &mut false); let mut endpoints: EndpointData = Default::default(); endpoints.src_info.l3_epc_id = 1; @@ -1315,7 +1324,7 @@ mod tests { is_vip: true, ..Default::default() }; - labeler.update_cidr_table(&vec![Arc::new(cidr)]); + labeler.update_cidr_table(&vec![Arc::new(cidr)], false, &mut false); let mut endpoints: EndpointData = Default::default(); endpoints.dst_info.l3_epc_id = 1; diff --git a/agent/src/policy/policy.rs b/agent/src/policy/policy.rs index c8a57e712c5..95310e9a489 100644 --- a/agent/src/policy/policy.rs +++ b/agent/src/policy/policy.rs @@ -21,7 +21,7 @@ use std::sync::{ }; use ahash::AHashMap; -use log::{debug, info, warn}; +use log::{debug, info}; use pnet::datalink; use public::enums::IpProtocol; @@ -476,17 +476,30 @@ impl Policy { self.labeler.update_peer_table(peers); } - pub fn update_cidr(&mut self, cidrs: &Vec>) { + pub fn update_cidr( + &mut self, + cidrs: &Vec>, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, + ) { self.table.update_cidr(cidrs); - self.labeler.update_cidr_table(cidrs); + self.labeler + .update_cidr_table(cidrs, enabled_invalid_log, has_invalid_log); } pub fn update_container(&mut self, cidrs: &Vec>) { self.labeler.update_container(cidrs); } - pub fn update_acl(&mut self, acls: &Vec>, check: bool) -> PResult<()> { - self.table.update_acl(acls, check)?; + pub fn update_acl( + &mut self, + acls: &Vec>, + check: bool, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, + ) -> PResult<()> { + self.table + .update_acl(acls, check, enabled_invalid_log, has_invalid_log)?; self.acls = acls.clone(); @@ -553,7 +566,7 @@ impl Policy { for gpid_entry in gpid_entries.iter() { let protocol = u8::from(gpid_entry.protocol) as usize; if protocol >= table.len() { - warn!("Invalid protocol {:?} in {:?}", protocol, &gpid_entry); + debug!("Invalid protocol {:?} in {:?}", protocol, &gpid_entry); continue; } @@ -678,6 +691,8 @@ impl FlowAclListener for PolicySetter { peers: &Vec>, cidrs: &Vec>, acls: &Vec>, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, ) -> Result<(), String> { self.update_local_epc( local_epc, @@ -686,8 +701,8 @@ impl FlowAclListener for PolicySetter { self.update_interfaces(agent_type, platform_data); self.update_ip_group(ip_groups); self.update_peer_connections(peers); - self.update_cidr(cidrs); - if let Err(e) = self.update_acl(acls, true) { + self.update_cidr(cidrs, enabled_invalid_log, has_invalid_log); + if let Err(e) = self.update_acl(acls, true, enabled_invalid_log, has_invalid_log) { return Err(format!("{}", e)); } @@ -727,16 +742,29 @@ impl PolicySetter { self.policy().update_peer_connections(peers); } - pub fn update_cidr(&mut self, cidrs: &Vec>) { - self.policy().update_cidr(cidrs); + pub fn update_cidr( + &mut self, + cidrs: &Vec>, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, + ) { + self.policy() + .update_cidr(cidrs, enabled_invalid_log, has_invalid_log); } pub fn update_container(&mut self, containers: &Vec>) { self.policy().update_container(containers); } - pub fn update_acl(&mut self, acls: &Vec>, check: bool) -> PResult<()> { - self.policy().update_acl(acls, check)?; + pub fn update_acl( + &mut self, + acls: &Vec>, + check: bool, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, + ) -> PResult<()> { + self.policy() + .update_acl(acls, check, enabled_invalid_log, has_invalid_log)?; Ok(()) } @@ -805,7 +833,7 @@ mod test { ..Default::default() }; setter.update_interfaces(AgentType::TtHostPod, &vec![Arc::new(interface)]); - setter.update_cidr(&vec![Arc::new(cidr)]); + setter.update_cidr(&vec![Arc::new(cidr)], false, &mut false); setter.flush(); let mut key = LookupKey { diff --git a/agent/src/rpc/synchronizer.rs b/agent/src/rpc/synchronizer.rs index 0154c5042d0..ce30725e281 100644 --- a/agent/src/rpc/synchronizer.rs +++ b/agent/src/rpc/synchronizer.rs @@ -157,6 +157,7 @@ pub struct Status { // GRPC数据 pub local_epc: i32, + pub last_invalid_log: Duration, pub version_platform_data: u64, pub version_acls: u64, pub version_groups: u64, @@ -187,6 +188,7 @@ impl Default for Status { ntp_max_interval: Duration::from_secs(300), local_epc: EPC_INTERNET, + last_invalid_log: Duration::ZERO, version_platform_data: 0, version_acls: 0, version_groups: 0, @@ -200,6 +202,22 @@ impl Default for Status { } impl Status { + const INVALID_LOG_INTERVAL: Duration = Duration::from_secs(50); + + pub fn enabled_invalid_log(&mut self) -> bool { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + + if self.last_invalid_log > now { + self.last_invalid_log = now; + } + + now - self.last_invalid_log >= Self::INVALID_LOG_INTERVAL + } + + pub fn update_last_invalid_log(&mut self) { + self.last_invalid_log = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + } + fn update_platform_data( &mut self, version: u64, @@ -243,7 +261,11 @@ impl Status { self.acls = acls; } - pub fn get_platform_data(&mut self, resp: &pb::SyncResponse) -> bool { + pub fn get_platform_data( + &mut self, + resp: &pb::SyncResponse, + enabled_invalid_log: bool, + ) -> (bool, bool) { let current_version = self.version_platform_data; let version = resp.version_platform_data.unwrap_or(0); debug!( @@ -252,13 +274,14 @@ impl Status { ); if version == 0 { debug!("platform data in preparation."); - return false; + return (false, false); } if version == current_version { debug!("platform data same version."); - return false; + return (false, false); } + let mut has_invalid_log = false; if let Some(platform_compressed) = &resp.platform_data { let platform = pb::PlatformData::decode(platform_compressed.as_slice()); if platform.is_ok() { @@ -266,12 +289,16 @@ impl Status { let mut interfaces = Vec::new(); let mut peers = Vec::new(); let mut cidrs = Vec::new(); + let mut invalid_interfaces = Vec::new(); + let mut invalid_cidrs = Vec::new(); for item in &platform.interfaces { let result = VInterface::try_from(item); if result.is_ok() { interfaces.push(Arc::new(result.unwrap())); } else { - warn!("{:?}: {}", item, result.unwrap_err()); + if enabled_invalid_log { + invalid_interfaces.push(item.id()); + } } } for item in &platform.peer_connections { @@ -282,9 +309,27 @@ impl Status { if result.is_ok() { cidrs.push(Arc::new(result.unwrap())); } else { - warn!("{:?}: {}", item, result.unwrap_err()); + if enabled_invalid_log { + invalid_cidrs.push(item.prefix()); + } + } + } + + if enabled_invalid_log { + if !invalid_interfaces.is_empty() { + warn!("Invalid interfaces: {:?}, maybe it's caused by the wrong mac, ip_resource, if_type.", invalid_interfaces); + has_invalid_log = true; + } + + if !invalid_cidrs.is_empty() { + warn!( + "Invalid cidrs: {:?}, maybe it's caused by the wrong prefix.", + invalid_cidrs + ); + has_invalid_log = true; } } + self.update_platform_data(version, interfaces, peers, cidrs); } else { error!("Invalid platform data."); @@ -293,7 +338,7 @@ impl Status { } else { self.update_platform_data(version, vec![], vec![], vec![]); } - return true; + return (true, has_invalid_log); } fn modify_platform( @@ -333,7 +378,11 @@ impl Status { // TODO:bridge fdb } - pub fn get_flow_acls(&mut self, resp: &pb::SyncResponse) -> bool { + pub fn get_flow_acls( + &mut self, + resp: &pb::SyncResponse, + enabled_invalid_log: bool, + ) -> (bool, bool) { let version = resp.version_acls.unwrap_or(0); debug!( "get grpc FlowAcls version: {} vs current version: {}.", @@ -341,27 +390,38 @@ impl Status { ); if version == 0 { debug!("FlowAcls data in preparation."); - return false; + return (false, false); } if version == self.version_acls { debug!("FlowAcls data same version."); - return false; + return (false, false); } + let mut has_invalid_log = false; if let Some(acls_commpressed) = &resp.flow_acls { let acls = pb::FlowAcls::decode(acls_commpressed.as_slice()); if let Ok(acls) = acls { + let mut invalid_flow_acl = Vec::new(); let flow_acls = acls .flow_acl .into_iter() - .filter_map(|a| match a.try_into() { - Err(e) => { - warn!("{}", e); - None + .filter_map(|a| { + let id = a.id(); + match a.try_into() { + Err(_) => { + if enabled_invalid_log { + invalid_flow_acl.push(id); + } + None + } + t => t.ok(), } - t => t.ok(), }) .collect::>(); + if enabled_invalid_log && !invalid_flow_acl.is_empty() { + warn!("Invalid flow acl: {:?}, maybe it's with the wrong port or capture_network_type.", invalid_flow_acl); + has_invalid_log = true; + } self.update_flow_acl(version, flow_acls); } else { error!("Invalid acls."); @@ -370,10 +430,14 @@ impl Status { } else { self.update_flow_acl(version, vec![]); } - return true; + return (true, has_invalid_log); } - pub fn get_ip_groups(&mut self, resp: &pb::SyncResponse) -> bool { + pub fn get_ip_groups( + &mut self, + resp: &pb::SyncResponse, + enabled_invalid_log: bool, + ) -> (bool, bool) { let version = resp.version_groups.unwrap_or(0); debug!( "get grpc Groups version: {} vs current version: {}.", @@ -381,26 +445,39 @@ impl Status { ); if version == 0 { debug!("Groups data in preparation."); - return false; + return (false, false); } if self.version_groups == version { debug!("Groups data same version."); - return false; + return (false, false); } + let mut has_invalid_log = false; if let Some(groups_compressed) = &resp.groups { let groups = pb::Groups::decode(groups_compressed.as_slice()); if groups.is_ok() { let groups = groups.unwrap(); let mut ip_groups = Vec::new(); + let mut invalid_ip_groups = Vec::new(); for item in &groups.groups { let result = IpGroupData::try_from(item); if result.is_ok() { ip_groups.push(Arc::new(result.unwrap())); } else { - warn!("{}", result.unwrap_err()); + if enabled_invalid_log { + invalid_ip_groups.push(item.id()) + } } } + + if enabled_invalid_log && !invalid_ip_groups.is_empty() { + warn!( + "Invalid ip groups: {:?}, maybe it doesn't come with a valid IP address", + invalid_ip_groups + ); + has_invalid_log = true; + } + self.update_ip_groups(version, ip_groups); } else { error!("Invalid ip groups."); @@ -409,7 +486,7 @@ impl Status { } else { self.update_ip_groups(version, vec![]); } - return true; + return (true, has_invalid_log); } pub fn get_blacklist(&mut self, resp: &pb::SyncResponse) -> Vec { @@ -428,6 +505,8 @@ impl Status { &self, agent_type: AgentType, listener: &mut Box, + enabled_invalid_log: bool, + has_invalid_log: &mut bool, ) -> Result<(), String> { listener.flow_acl_change( agent_type, @@ -437,6 +516,8 @@ impl Status { &self.peers, &self.cidrs, &self.acls, + enabled_invalid_log, + has_invalid_log, ) } @@ -446,7 +527,10 @@ impl Status { static_config: &StaticConfig, resp: &pb::SyncResponse, macs: &Vec, - ) -> (bool, bool) { + enabled_invalid_log: bool, + ) -> (bool, bool, bool) { + let mut has_invalid_log = false; + self.proxy_ip = if user_config.global.communication.proxy_controller_ip.len() > 0 { Some(user_config.global.communication.proxy_controller_ip.clone()) } else { @@ -457,7 +541,7 @@ impl Status { self.ntp_enabled = user_config.global.ntp.enabled; self.ntp_max_interval = user_config.global.ntp.max_drift; self.ntp_min_interval = user_config.global.ntp.min_drift; - let updated_platform = self.get_platform_data(resp); + let (updated_platform, invalid_log) = self.get_platform_data(resp, enabled_invalid_log); if updated_platform { self.modify_platform( macs, @@ -465,8 +549,16 @@ impl Status { &resp.dynamic_config.clone().unwrap_or_default(), ); } - let mut updated = self.get_ip_groups(resp) || updated_platform; - updated = self.get_flow_acls(resp) || updated; + has_invalid_log |= invalid_log; + + let (mut updated, invalid_log) = self.get_ip_groups(resp, enabled_invalid_log); + updated |= updated_platform; + has_invalid_log |= invalid_log; + + let (updated_acl, invalid_log) = self.get_flow_acls(resp, enabled_invalid_log); + updated |= updated_acl; + has_invalid_log |= invalid_log; + updated = self.get_local_epc(&resp.dynamic_config.clone().unwrap_or(DynamicConfig { kubernetes_api_enabled: None, region_id: None, @@ -483,7 +575,7 @@ impl Status { })) || updated; let wait_ntp = self.ntp_enabled && self.first; - (updated, wait_ntp) + (updated, wait_ntp, has_invalid_log) } } @@ -724,53 +816,82 @@ impl Synchronizer { fn parse_segment( capture_mode: PacketCaptureType, resp: &pb::SyncResponse, - ) -> (Vec, Vec, Vec) { + enabled_invalid_log: bool, + ) -> (Vec, Vec, Vec, bool) { let segments = if capture_mode == PacketCaptureType::Analyzer { resp.remote_segments.clone() } else { resp.local_segments.clone() }; - if segments.len() == 0 && capture_mode != PacketCaptureType::Local { - warn!("Segment is empty, in {:?} mode.", capture_mode); - } let mut macs = Vec::new(); let mut gateway_vmacs = Vec::new(); + let mut invalid_segment = Vec::new(); + let mut invalid_mac = Vec::new(); + let mut invalid_vmac = Vec::new(); for segment in &segments { let vm_macs = &segment.mac; let vmacs = &segment.vmac; if vm_macs.len() != vmacs.len() { - warn!( - "Invalid segment the length of vmMacs and vMacs is inconsistent: {:?}", - segment - ); + if enabled_invalid_log { + invalid_segment.push(segment.id()); + } continue; } for (mac_str, vmac_str) in vm_macs.iter().zip(vmacs) { let mac = MacAddr::from_str(mac_str.as_str()); if mac.is_err() { - warn!( - "Malformed VM mac {}, response rejected: {}", - mac_str, - mac.unwrap_err() - ); + if enabled_invalid_log { + invalid_mac.push(mac_str.as_str()); + } continue; } let vmac = MacAddr::from_str(vmac_str.as_str()); if vmac.is_err() { - warn!( - "Malformed VM vmac {}, response rejected: {}", - vmac_str, - vmac.unwrap_err() - ); + if enabled_invalid_log { + invalid_vmac.push(vmac_str.as_str()); + } continue; } macs.push(mac.unwrap()); gateway_vmacs.push(vmac.unwrap()); } } - return (segments, macs, gateway_vmacs); + + let mut has_invalid_log = false; + if enabled_invalid_log { + if segments.len() == 0 && capture_mode != PacketCaptureType::Local { + info!("Segment is empty, in {:?} mode.", capture_mode); + has_invalid_log = true; + } + + if !invalid_segment.is_empty() { + warn!( + "Invalid segment {:?}, the length of vmMacs and vMacs is inconsistent.", + invalid_segment + ); + has_invalid_log = true; + } + if !invalid_mac.is_empty() { + warn!( + "Invalid mac {:?}, The mac address is invalid and cannot be resolved to MacAddr + .", + invalid_mac + ); + has_invalid_log = true; + } + if !invalid_vmac.is_empty() { + warn!( + "Invalid vmac {:?}, The vmac address is invalid and cannot be resolved to MacAddr + .", + invalid_vmac + ); + has_invalid_log = true; + } + } + + return (segments, macs, gateway_vmacs, has_invalid_log); } // Note that both 'status' and 'flow_acl_listener' will be locked here, and other places where 'status' @@ -842,12 +963,32 @@ impl Synchronizer { for listener in flow_acl_listener.lock().unwrap().iter_mut() { listener.containers_change(&containers); } - let (_, macs, gateway_vmac_addrs) = - Self::parse_segment(user_config.inputs.cbpf.common.capture_mode, &resp); - let (updated, wait_ntp) = { + let (updated, wait_ntp, macs, gateway_vmac_addrs, enabled_invalid_log, mut has_invalid_log) = { let mut status_guard = status.write(); - status_guard.update(&user_config, static_config, &resp, &macs) + let enabled_invalid_log = status_guard.enabled_invalid_log(); + + let (_, macs, gateway_vmac_addrs, has_invalid_log) = Self::parse_segment( + user_config.inputs.cbpf.common.capture_mode, + &resp, + enabled_invalid_log, + ); + let (updated, wait_ntp, invalid_log) = status_guard.update( + &user_config, + static_config, + &resp, + &macs, + enabled_invalid_log, + ); + + ( + updated, + wait_ntp, + macs, + gateway_vmac_addrs, + enabled_invalid_log, + has_invalid_log || invalid_log, + ) }; if wait_ntp { // Here, it is necessary to wait for the NTP synchronization timestamp to start @@ -863,9 +1004,12 @@ impl Synchronizer { status_guard.version_groups, status_guard.version_platform_data, status_guard.version_acls); let mut policy_error = false; for listener in flow_acl_listener.lock().unwrap().iter_mut() { - if let Err(e) = - status_guard.trigger_flow_acl(user_config.global.common.agent_type, listener) - { + if let Err(e) = status_guard.trigger_flow_acl( + user_config.global.common.agent_type, + listener, + enabled_invalid_log, + &mut has_invalid_log, + ) { warn!("OnPolicyChange: {}.", e); policy_error = true; } @@ -887,6 +1031,12 @@ impl Synchronizer { status_guard.acls.len(), ); } + + if has_invalid_log { + let mut status_guard = status.write(); + status_guard.update_last_invalid_log(); + } + let mut status_guard = status.write(); let blacklist = status_guard.get_blacklist(&resp); status_guard.first = false; @@ -1441,6 +1591,8 @@ impl Synchronizer { &vec![], &vec![], &vec![], + false, + &mut false, ); } diff --git a/agent/src/utils/npb_bandwidth_watcher.rs b/agent/src/utils/npb_bandwidth_watcher.rs index 02c3fb938c2..410658a001f 100644 --- a/agent/src/utils/npb_bandwidth_watcher.rs +++ b/agent/src/utils/npb_bandwidth_watcher.rs @@ -330,6 +330,8 @@ impl FlowAclListener for Arc { _peers: &Vec>, _cidrs: &Vec>, acls: &Vec>, + _: bool, + _: &mut bool, ) -> Result<(), String> { if NOT_SUPPORT { return Ok(());