From 975884ded983d534c169736a27d0c2a9d709fbf5 Mon Sep 17 00:00:00 2001 From: jobafr Date: Tue, 21 May 2024 12:33:39 +0200 Subject: [PATCH] add latching field to header --- rosrust/src/tcpros/publisher.rs | 43 +++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/rosrust/src/tcpros/publisher.rs b/rosrust/src/tcpros/publisher.rs index ef6c842..6dcfdd9 100644 --- a/rosrust/src/tcpros/publisher.rs +++ b/rosrust/src/tcpros/publisher.rs @@ -9,6 +9,7 @@ use error_chain::bail; use log::error; use std::collections::HashMap; use std::net::{TcpListener, TcpStream, ToSocketAddrs}; +use std::sync::atomic::AtomicBool; use std::sync::{atomic, Arc, Mutex}; pub struct Publisher { @@ -18,6 +19,7 @@ pub struct Publisher { last_message: Arc>>>, queue_size: usize, exists: Arc, + latching: Arc, } impl Drop for Publisher { @@ -57,6 +59,7 @@ fn write_response( caller_id: &str, topic: &str, message_description: &RawMessageDescription, + latching: bool, ) -> Result<()> { let mut fields = HashMap::::new(); fields.insert(String::from("md5sum"), message_description.md5sum.clone()); @@ -67,6 +70,10 @@ fn write_response( String::from("message_definition"), message_description.msg_definition.clone(), ); + fields.insert( + String::from("latching"), + String::from(if latching { "1" } else { "0" }), + ); header::encode(&mut stream, &fields)?; Ok(()) } @@ -76,12 +83,19 @@ fn exchange_headers( topic: &str, pub_caller_id: &str, message_description: &RawMessageDescription, + latching: bool, ) -> Result where U: std::io::Write + std::io::Read, { let caller_id = read_request(&mut stream, topic, message_description)?; - write_response(&mut stream, pub_caller_id, topic, message_description)?; + write_response( + &mut stream, + pub_caller_id, + topic, + message_description, + latching, + )?; Ok(caller_id) } @@ -92,12 +106,19 @@ fn process_subscriber( last_message: &Mutex>>, pub_caller_id: &str, message_description: &RawMessageDescription, + latching: bool, ) -> tcpconnection::Feedback where U: std::io::Read + std::io::Write + Send, { - let result = exchange_headers(&mut stream, topic, pub_caller_id, message_description) - .chain_err(|| ErrorKind::TopicConnectionFail(topic.into())); + let result = exchange_headers( + &mut stream, + topic, + pub_caller_id, + message_description, + latching, + ) + .chain_err(|| ErrorKind::TopicConnectionFail(topic.into())); let caller_id = match result { Ok(caller_id) => caller_id, Err(err) => { @@ -146,12 +167,15 @@ impl Publisher { let (targets, data) = fork(queue_size); let last_message = Arc::new(Mutex::new(Arc::new(Vec::new()))); + let latching = Arc::new(AtomicBool::new(false)); + let iterate_handler = { let publisher_exists = publisher_exists.clone(); let topic = String::from(topic); let last_message = Arc::clone(&last_message); let caller_id = String::from(caller_id); let message_description = message_description.clone(); + let latching = Arc::clone(&latching); move |stream: TcpStream| { if !publisher_exists.load(atomic::Ordering::SeqCst) { @@ -164,6 +188,7 @@ impl Publisher { &last_message, &caller_id, &message_description, + latching.load(atomic::Ordering::SeqCst), ) } }; @@ -183,6 +208,7 @@ impl Publisher { last_message, queue_size, exists: publisher_exists, + latching, }) } @@ -191,7 +217,7 @@ impl Publisher { queue_size: usize, message_description: RawMessageDescription, ) -> Result> { - let mut stream = PublisherStream::new(self, message_description)?; + let mut stream = PublisherStream::new(self, message_description, Arc::clone(&self.latching))?; stream.set_queue_size_max(queue_size); Ok(stream) } @@ -210,13 +236,14 @@ pub struct PublisherStream { stream: DataStream, last_message: Arc>>>, datatype: std::marker::PhantomData, - latching: bool, + latching: Arc, } impl PublisherStream { fn new( publisher: &Publisher, message_description: RawMessageDescription, + latching : Arc ) -> Result> { if publisher.topic.msg_type != message_description.msg_type { bail!(ErrorKind::MessageTypeMismatch( @@ -228,7 +255,7 @@ impl PublisherStream { stream: publisher.subscriptions.clone(), datatype: std::marker::PhantomData, last_message: Arc::clone(&publisher.last_message), - latching: false, + latching, }; stream.set_queue_size_max(publisher.queue_size); Ok(stream) @@ -246,7 +273,7 @@ impl PublisherStream { #[inline] pub fn set_latching(&mut self, latching: bool) { - self.latching = latching; + self.latching.store(latching, atomic::Ordering::SeqCst); } #[inline] @@ -262,7 +289,7 @@ impl PublisherStream { pub fn send(&self, message: &T) -> Result<()> { let bytes = Arc::new(message.encode_vec()?); - if self.latching { + if self.latching.load(atomic::Ordering::SeqCst) { *self.last_message.lock().expect(FAILED_TO_LOCK) = Arc::clone(&bytes); }