Skip to content

Commit c596554

Browse files
committed
fix: l7 throttle is not accurate
1 parent ed5b72e commit c596554

File tree

6 files changed

+146
-117
lines changed

6 files changed

+146
-117
lines changed

agent/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/crates/public/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ num_enum = "0.5.6"
2020
parking_lot = "0.11"
2121
pnet = "^0.29"
2222
prost.workspace = true
23+
rand = "0.8.5"
2324
regex.workspace = true
2425
serde = { version = "1.0", features = ["derive"] }
2526
serde_json = "1.0.72"

agent/crates/public/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ pub mod pwd;
3131
pub mod queue;
3232
pub mod rpc;
3333
pub mod sender;
34+
pub mod throttle;
3435
pub mod utils;
3536

3637
#[cfg(target_os = "linux")]
3738
pub mod netns;
3839

3940
pub use leaky_bucket::LeakyBucket;
41+
pub use throttle::Throttle;

agent/crates/public/src/throttle.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) 2025 Yunshan Networks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
use std::fmt::Debug;
18+
19+
use log::warn;
20+
use rand::prelude::{Rng, SeedableRng, SmallRng};
21+
22+
use crate::queue::DebugSender;
23+
use crate::LeakyBucket;
24+
25+
const BUFFER_SIZE: usize = 1024;
26+
pub struct Throttle<T: Debug> {
27+
leaky_bucket: LeakyBucket,
28+
period_count: u32,
29+
buffer: Vec<T>,
30+
output_queue: DebugSender<T>,
31+
small_rng: SmallRng,
32+
}
33+
34+
impl<T: Debug> Throttle<T> {
35+
pub fn new(rate: u64, output_queue: DebugSender<T>) -> Self {
36+
Throttle {
37+
leaky_bucket: LeakyBucket::new(Some(rate)),
38+
buffer: Vec::with_capacity(BUFFER_SIZE),
39+
output_queue,
40+
small_rng: SmallRng::from_entropy(),
41+
period_count: 0,
42+
}
43+
}
44+
45+
// return false, indicates that the throttle has been reached
46+
// and the item or cached items will be discarded
47+
pub fn send(&mut self, item: T) -> bool {
48+
if self.buffer.len() > BUFFER_SIZE {
49+
self.flush();
50+
self.period_count = 0;
51+
}
52+
53+
self.period_count += 1;
54+
if self.leaky_bucket.acquire(1) {
55+
self.buffer.push(item);
56+
} else {
57+
let index = self.small_rng.gen_range(0..self.period_count) as usize;
58+
if index < self.buffer.len() {
59+
self.buffer[index] = item;
60+
}
61+
return false;
62+
}
63+
true
64+
}
65+
66+
pub fn flush(&mut self) {
67+
if !self.buffer.is_empty() {
68+
if let Err(e) = self.output_queue.send_all(&mut self.buffer) {
69+
warn!(
70+
"throttle push {} items to queue failed, because {:?}",
71+
self.buffer.len(),
72+
e
73+
);
74+
self.buffer.clear();
75+
}
76+
}
77+
}
78+
79+
pub fn set_rate(&mut self, rate: u64) {
80+
self.leaky_bucket.set_rate(Some(rate));
81+
}
82+
}

agent/src/flow_generator/protocol_logs.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,15 @@ pub struct BoxAppProtoLogsData {
388388
pub override_resp_status: Option<L7ResponseStatus>,
389389
}
390390

391+
impl BoxAppProtoLogsData {
392+
pub fn new(data: Box<MetaAppProto>, override_resp_status: Option<L7ResponseStatus>) -> Self {
393+
Self {
394+
data,
395+
override_resp_status,
396+
}
397+
}
398+
}
399+
391400
impl Sendable for BoxAppProtoLogsData {
392401
fn encode(self, buf: &mut Vec<u8>) -> Result<usize, prost::EncodeError> {
393402
let mut pb_proto_logs_data = flow_log::AppProtoLogsData {

0 commit comments

Comments
 (0)