Skip to content

[ZEN-587] Make Reply::replier_id() return an EntityGlobalId #2052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions zenoh/src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use std::{collections::HashMap, error::Error, fmt::Display};
#[cfg(feature = "unstable")]
use serde::Deserialize;
#[cfg(feature = "unstable")]
use zenoh_config::ZenohId;
use zenoh_config::wrappers::EntityGlobalId;
use zenoh_keyexpr::OwnedKeyExpr;
use zenoh_protocol::core::Parameters;
#[cfg(feature = "unstable")]
use zenoh_protocol::core::ZenohIdProto;
use zenoh_protocol::core::EntityGlobalIdProto;
use zenoh_protocol::core::Parameters;
/// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](crate::Session::get).
pub use zenoh_protocol::network::request::ext::QueryTarget;
#[doc(inline)]
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Error for ReplyError {}
pub struct Reply {
pub(crate) result: Result<Sample, ReplyError>,
#[cfg(feature = "unstable")]
pub(crate) replier_id: Option<ZenohIdProto>,
pub(crate) replier_id: Option<EntityGlobalIdProto>,
}

impl Reply {
Expand All @@ -149,10 +149,8 @@ impl Reply {
}

#[zenoh_macros::unstable]
// @TODO: maybe return an `Option<EntityGlobalId>`?
//
/// Gets the id of the zenoh instance that answered this Reply.
pub fn replier_id(&self) -> Option<ZenohId> {
pub fn replier_id(&self) -> Option<EntityGlobalId> {
self.replier_id.map(Into::into)
}

Expand Down
18 changes: 14 additions & 4 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2328,7 +2328,7 @@ impl SessionInner {
query.callback.call(Reply {
result: Err(ReplyError::new("Timeout", Encoding::ZENOH_STRING)),
#[cfg(feature = "unstable")]
replier_id: Some(session.zid().into()),
replier_id: None
});
}
}
Expand Down Expand Up @@ -2427,7 +2427,7 @@ impl SessionInner {
query.callback.call(Reply {
result: Err(ReplyError::new("Timeout", Encoding::ZENOH_STRING)),
#[cfg(feature = "unstable")]
replier_id: Some(session.zid().into()),
replier_id: None
});
}
}
Expand Down Expand Up @@ -2875,7 +2875,12 @@ impl Primitives for WeakSession {
encoding: mem::take(&mut e.encoding).into(),
}),
#[cfg(feature = "unstable")]
replier_id: mem::take(&mut msg.ext_respid).map(|rid| rid.zid),
replier_id: mem::take(&mut msg.ext_respid).map(|rid| {
zenoh_protocol::core::EntityGlobalIdProto {
zid: rid.zid,
eid: rid.eid,
}
}),
};
callback.call(new_reply);
}
Expand Down Expand Up @@ -2967,7 +2972,12 @@ impl Primitives for WeakSession {
let new_reply = Reply {
result: Ok(sample),
#[cfg(feature = "unstable")]
replier_id: mem::take(&mut msg.ext_respid).map(|rid| rid.zid),
replier_id: mem::take(&mut msg.ext_respid).map(|rid| {
zenoh_protocol::core::EntityGlobalIdProto {
zid: rid.zid,
eid: rid.eid,
}
}),
};
let callback =
match query.reception_mode {
Expand Down
6 changes: 3 additions & 3 deletions zenoh/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ async fn test_session_query_reply_internal<Getter: HasGet>(
let rs = getter.get("ok_put").await;
while let Ok(s) = ztimeout!(rs.recv_async()) {
#[cfg(feature = "unstable")]
assert_eq!(s.replier_id(), Some(qbl.id().zid()));
assert_eq!(s.replier_id(), Some(qbl.id()));
let s = s.result().unwrap();
assert_eq!(s.kind(), SampleKind::Put);
assert_eq!(s.payload().len(), size);
Expand All @@ -270,7 +270,7 @@ async fn test_session_query_reply_internal<Getter: HasGet>(
let rs = getter.get("ok_del").await;
while let Ok(s) = ztimeout!(rs.recv_async()) {
#[cfg(feature = "unstable")]
assert_eq!(s.replier_id(), Some(qbl.id().zid()));
assert_eq!(s.replier_id(), Some(qbl.id()));
let s = s.result().unwrap();
assert_eq!(s.kind(), SampleKind::Delete);
assert_eq!(s.payload().len(), 0);
Expand All @@ -289,7 +289,7 @@ async fn test_session_query_reply_internal<Getter: HasGet>(
let rs = getter.get("err").await;
while let Ok(s) = ztimeout!(rs.recv_async()) {
#[cfg(feature = "unstable")]
assert_eq!(s.replier_id(), Some(qbl.id().zid()));
assert_eq!(s.replier_id(), Some(qbl.id()));
let e = s.result().unwrap_err();
assert_eq!(e.payload().len(), size);
cnt += 1;
Expand Down
Loading