Skip to content

Commit 4499d0b

Browse files
Improve interceptors performances (#2053)
* Improve interceptors performances * Fix bugs$ * Fix reentrant deadlock * Uncomment tests
1 parent 01595a9 commit 4499d0b

File tree

10 files changed

+536
-544
lines changed

10 files changed

+536
-544
lines changed

zenoh/src/net/primitives/demux.rs

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,22 @@
1111
// Contributors:
1212
// ZettaScale Zenoh Team, <[email protected]>
1313
//
14-
use std::{any::Any, sync::Arc};
14+
use std::{any::Any, cell::OnceCell, sync::Arc};
1515

1616
use arc_swap::ArcSwap;
1717
use zenoh_link::Link;
1818
use zenoh_protocol::network::{
19-
ext, response, Declare, DeclareBody, DeclareFinal, NetworkBodyMut, NetworkMessageMut,
20-
ResponseFinal,
19+
ext, response, Declare, DeclareBody, DeclareFinal, NetworkBodyMut, NetworkMessageExt as _,
20+
NetworkMessageMut, ResponseFinal,
2121
};
2222
use zenoh_result::ZResult;
2323
use zenoh_transport::{unicast::TransportUnicast, TransportPeerEventHandler};
2424

2525
use super::Primitives;
2626
use crate::net::routing::{
2727
dispatcher::face::Face,
28-
interceptor::{InterceptorTrait, InterceptorsChain},
28+
interceptor::{InterceptorContext, InterceptorTrait, InterceptorsChain},
29+
router::{InterceptorCacheValueType, Resource},
2930
RoutingContext,
3031
};
3132

@@ -49,26 +50,73 @@ impl DeMux {
4950
}
5051
}
5152

53+
struct DeMuxContext<'a> {
54+
demux: &'a DeMux,
55+
cache: OnceCell<InterceptorCacheValueType>,
56+
expr: OnceCell<String>,
57+
}
58+
59+
impl DeMuxContext<'_> {
60+
fn prefix(&self, msg: &NetworkMessageMut) -> Option<Arc<Resource>> {
61+
if let Some(wire_expr) = msg.wire_expr() {
62+
let wire_expr = wire_expr.to_owned();
63+
if let Some(prefix) = zread!(self.demux.face.tables.tables)
64+
.get_mapping(&self.demux.face.state, &wire_expr.scope, wire_expr.mapping)
65+
.cloned()
66+
{
67+
return Some(prefix);
68+
}
69+
}
70+
None
71+
}
72+
}
73+
74+
impl InterceptorContext for DeMuxContext<'_> {
75+
fn face(&self) -> Option<Face> {
76+
Some(self.demux.face.clone())
77+
}
78+
79+
fn full_expr(&self, msg: &NetworkMessageMut) -> Option<&str> {
80+
if self.expr.get().is_none() {
81+
if let Some(wire_expr) = msg.wire_expr() {
82+
if let Some(prefix) = self.prefix(msg) {
83+
self.expr
84+
.set(prefix.expr().to_string() + wire_expr.suffix.as_ref())
85+
.ok();
86+
}
87+
}
88+
}
89+
self.expr.get().map(|x| x.as_str())
90+
}
91+
fn get_cache(&self, msg: &NetworkMessageMut) -> Option<&Box<dyn Any + Send + Sync>> {
92+
if self.cache.get().is_none() && msg.wire_expr().is_some_and(|we| !we.has_suffix()) {
93+
if let Some(prefix) = self.prefix(msg) {
94+
if let Some(cache) =
95+
prefix.get_ingress_cache(&self.demux.face, &self.demux.interceptor.load())
96+
{
97+
self.cache.set(cache).ok();
98+
}
99+
}
100+
}
101+
self.cache.get().and_then(|c| c.get_ref().as_ref())
102+
}
103+
}
104+
52105
impl TransportPeerEventHandler for DeMux {
53106
#[inline]
54107
fn handle_message(&self, mut msg: NetworkMessageMut) -> ZResult<()> {
55108
let interceptor = self.interceptor.load();
56109
if !interceptor.interceptors.is_empty() {
57-
let mut ctx = RoutingContext::new_in(msg.as_mut(), self.face.clone());
58-
let prefix = ctx
59-
.wire_expr()
60-
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
61-
.flatten()
62-
.cloned();
63-
let cache_guard = prefix
64-
.as_ref()
65-
.and_then(|p| p.get_ingress_cache(&self.face, &interceptor));
66-
let cache = cache_guard.as_ref().and_then(|c| c.get_ref().as_ref());
67-
68-
match &ctx.msg.body {
110+
let mut ctx = DeMuxContext {
111+
demux: self,
112+
cache: OnceCell::new(),
113+
expr: OnceCell::new(),
114+
};
115+
116+
match &msg.body {
69117
NetworkBodyMut::Request(request) => {
70118
let request_id = request.id;
71-
if !interceptor.intercept(&mut ctx, cache) {
119+
if !interceptor.intercept(&mut msg, &mut ctx as &mut dyn InterceptorContext) {
72120
// request was blocked by an interceptor, we need to send response final to avoid timeout error
73121
self.face
74122
.state
@@ -83,26 +131,22 @@ impl TransportPeerEventHandler for DeMux {
83131
}
84132
NetworkBodyMut::Interest(interest) => {
85133
let interest_id = interest.id;
86-
if !interceptor.intercept(&mut ctx, cache) {
134+
if !interceptor.intercept(&mut msg, &mut ctx as &mut dyn InterceptorContext) {
87135
// request was blocked by an interceptor, we need to send declare final to avoid timeout error
88-
self.face
89-
.state
90-
.primitives
91-
.send_declare(RoutingContext::new_in(
92-
&mut Declare {
93-
interest_id: Some(interest_id),
94-
ext_qos: ext::QoSType::DECLARE,
95-
ext_tstamp: None,
96-
ext_nodeid: ext::NodeIdType::DEFAULT,
97-
body: DeclareBody::DeclareFinal(DeclareFinal),
98-
},
99-
self.face.clone(),
100-
));
136+
self.face.state.primitives.send_declare(RoutingContext::new(
137+
&mut Declare {
138+
interest_id: Some(interest_id),
139+
ext_qos: ext::QoSType::DECLARE,
140+
ext_tstamp: None,
141+
ext_nodeid: ext::NodeIdType::DEFAULT,
142+
body: DeclareBody::DeclareFinal(DeclareFinal),
143+
},
144+
));
101145
return Ok(());
102146
}
103147
}
104148
_ => {
105-
if !interceptor.intercept(&mut ctx, cache) {
149+
if !interceptor.intercept(&mut msg, &mut ctx as &mut dyn InterceptorContext) {
106150
return Ok(());
107151
}
108152
}

0 commit comments

Comments
 (0)