From a279cfa423f929f1afabdc556b4bd810314f3732 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 21 Jul 2025 16:00:59 +0200 Subject: [PATCH 1/3] Make replied_id an EntityGlobalId --- zenoh/src/api/query.rs | 12 ++++-------- zenoh/src/api/session.rs | 26 ++++++++++++++++++++++---- zenoh/tests/session.rs | 6 +++--- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index aa05f5b6fa..c413c423b2 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -17,11 +17,9 @@ use std::{collections::HashMap, error::Error, fmt::Display}; #[cfg(feature = "unstable")] use serde::Deserialize; #[cfg(feature = "unstable")] -use zenoh_config::ZenohId; +use zenoh_config::wrappers::EntityGlobalId; use zenoh_keyexpr::OwnedKeyExpr; use zenoh_protocol::core::Parameters; -#[cfg(feature = "unstable")] -use zenoh_protocol::core::ZenohIdProto; /// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](crate::Session::get). pub use zenoh_protocol::network::request::ext::QueryTarget; #[doc(inline)] @@ -129,7 +127,7 @@ impl Error for ReplyError {} pub struct Reply { pub(crate) result: Result, #[cfg(feature = "unstable")] - pub(crate) replier_id: Option, + pub(crate) replier_id: Option, } impl Reply { @@ -149,11 +147,9 @@ impl Reply { } #[zenoh_macros::unstable] - // @TODO: maybe return an `Option`? - // /// Gets the id of the zenoh instance that answered this Reply. - pub fn replier_id(&self) -> Option { - self.replier_id.map(Into::into) + pub fn replier_id(&self) -> Option { + self.replier_id } /// Constructs an uninitialized empty Reply. diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index bb12361ea3..c01a70a377 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -2328,7 +2328,10 @@ impl SessionInner { query.callback.call(Reply { result: Err(ReplyError::new("Timeout", Encoding::ZENOH_STRING)), #[cfg(feature = "unstable")] - replier_id: Some(session.zid().into()), + replier_id: Some(zenoh_protocol::core::EntityGlobalIdProto { + zid: session.zid().into(), + eid: session.id.into(), + }.into()), }); } } @@ -2427,7 +2430,10 @@ impl SessionInner { query.callback.call(Reply { result: Err(ReplyError::new("Timeout", Encoding::ZENOH_STRING)), #[cfg(feature = "unstable")] - replier_id: Some(session.zid().into()), + replier_id: Some(zenoh_protocol::core::EntityGlobalIdProto { + zid: session.zid().into(), + eid: session.id.into(), + }.into()), }); } } @@ -2875,7 +2881,13 @@ impl Primitives for WeakSession { encoding: mem::take(&mut e.encoding).into(), }), #[cfg(feature = "unstable")] - replier_id: mem::take(&mut msg.ext_respid).map(|rid| rid.zid), + replier_id: mem::take(&mut msg.ext_respid).map(|rid| { + zenoh_protocol::core::EntityGlobalIdProto { + zid: rid.zid, + eid: rid.eid, + } + .into() + }), }; callback.call(new_reply); } @@ -2967,7 +2979,13 @@ impl Primitives for WeakSession { let new_reply = Reply { result: Ok(sample), #[cfg(feature = "unstable")] - replier_id: mem::take(&mut msg.ext_respid).map(|rid| rid.zid), + replier_id: mem::take(&mut msg.ext_respid).map(|rid| { + zenoh_protocol::core::EntityGlobalIdProto { + zid: rid.zid, + eid: rid.eid, + } + .into() + }), }; let callback = match query.reception_mode { diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 24b196d79a..61dbbe3fc5 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -251,7 +251,7 @@ async fn test_session_query_reply_internal( let rs = getter.get("ok_put").await; while let Ok(s) = ztimeout!(rs.recv_async()) { #[cfg(feature = "unstable")] - assert_eq!(s.replier_id(), Some(qbl.id().zid())); + assert_eq!(s.replier_id(), Some(qbl.id())); let s = s.result().unwrap(); assert_eq!(s.kind(), SampleKind::Put); assert_eq!(s.payload().len(), size); @@ -270,7 +270,7 @@ async fn test_session_query_reply_internal( let rs = getter.get("ok_del").await; while let Ok(s) = ztimeout!(rs.recv_async()) { #[cfg(feature = "unstable")] - assert_eq!(s.replier_id(), Some(qbl.id().zid())); + assert_eq!(s.replier_id(), Some(qbl.id())); let s = s.result().unwrap(); assert_eq!(s.kind(), SampleKind::Delete); assert_eq!(s.payload().len(), 0); @@ -289,7 +289,7 @@ async fn test_session_query_reply_internal( let rs = getter.get("err").await; while let Ok(s) = ztimeout!(rs.recv_async()) { #[cfg(feature = "unstable")] - assert_eq!(s.replier_id(), Some(qbl.id().zid())); + assert_eq!(s.replier_id(), Some(qbl.id())); let e = s.result().unwrap_err(); assert_eq!(e.payload().len(), size); cnt += 1; From e7a391e1651130a3c51d443ec21e3c8ba65242cd Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 21 Jul 2025 17:56:22 +0200 Subject: [PATCH 2/3] Use EntityGlobalIdProto internally --- zenoh/src/api/query.rs | 6 ++++-- zenoh/src/api/session.rs | 6 ++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index c413c423b2..96fd820ae4 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -19,6 +19,8 @@ use serde::Deserialize; #[cfg(feature = "unstable")] use zenoh_config::wrappers::EntityGlobalId; use zenoh_keyexpr::OwnedKeyExpr; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::EntityGlobalIdProto; use zenoh_protocol::core::Parameters; /// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](crate::Session::get). pub use zenoh_protocol::network::request::ext::QueryTarget; @@ -127,7 +129,7 @@ impl Error for ReplyError {} pub struct Reply { pub(crate) result: Result, #[cfg(feature = "unstable")] - pub(crate) replier_id: Option, + pub(crate) replier_id: Option, } impl Reply { @@ -149,7 +151,7 @@ impl Reply { #[zenoh_macros::unstable] /// Gets the id of the zenoh instance that answered this Reply. pub fn replier_id(&self) -> Option { - self.replier_id + self.replier_id.map(Into::into) } /// Constructs an uninitialized empty Reply. diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index c01a70a377..01c89acae7 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -2331,7 +2331,7 @@ impl SessionInner { replier_id: Some(zenoh_protocol::core::EntityGlobalIdProto { zid: session.zid().into(), eid: session.id.into(), - }.into()), + }), }); } } @@ -2433,7 +2433,7 @@ impl SessionInner { replier_id: Some(zenoh_protocol::core::EntityGlobalIdProto { zid: session.zid().into(), eid: session.id.into(), - }.into()), + }), }); } } @@ -2886,7 +2886,6 @@ impl Primitives for WeakSession { zid: rid.zid, eid: rid.eid, } - .into() }), }; callback.call(new_reply); @@ -2984,7 +2983,6 @@ impl Primitives for WeakSession { zid: rid.zid, eid: rid.eid, } - .into() }), }; let callback = From 35efedb0e60065a8359adb78a899146643781821 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 22 Jul 2025 16:08:30 +0200 Subject: [PATCH 3/3] Change replier_id of timeout errors to None --- zenoh/src/api/session.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 01c89acae7..45ed83ca4e 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -2328,10 +2328,7 @@ impl SessionInner { query.callback.call(Reply { result: Err(ReplyError::new("Timeout", Encoding::ZENOH_STRING)), #[cfg(feature = "unstable")] - replier_id: Some(zenoh_protocol::core::EntityGlobalIdProto { - zid: session.zid().into(), - eid: session.id.into(), - }), + replier_id: None }); } } @@ -2430,10 +2427,7 @@ impl SessionInner { query.callback.call(Reply { result: Err(ReplyError::new("Timeout", Encoding::ZENOH_STRING)), #[cfg(feature = "unstable")] - replier_id: Some(zenoh_protocol::core::EntityGlobalIdProto { - zid: session.zid().into(), - eid: session.id.into(), - }), + replier_id: None }); } }