Skip to content

Commit 2696c9d

Browse files
committed
Stop publication cache task on disconnected channels
1 parent 3744075 commit 2696c9d

File tree

1 file changed

+41
-37
lines changed

1 file changed

+41
-37
lines changed

zenoh-ext/src/publication_cache.rs

+41-37
Original file line numberDiff line numberDiff line change
@@ -260,34 +260,53 @@ impl PublicationCache {
260260
tokio::select! {
261261
// on publication received by the local subscriber, store it
262262
sample = sub_recv.recv_async() => {
263-
if let Ok(sample) = sample {
264-
let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix {
265-
prefix.join(&sample.key_expr()).unwrap().into()
266-
} else {
267-
sample.key_expr().clone()
268-
};
269-
270-
if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) {
271-
if queue.len() >= history {
272-
queue.pop_front();
273-
}
274-
queue.push_back(sample);
275-
} else if cache.len() >= limit {
276-
tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource",
277-
pub_key_expr);
278-
} else {
279-
let mut queue: VecDeque<Sample> = VecDeque::new();
280-
queue.push_back(sample);
281-
cache.insert(queryable_key_expr.into(), queue);
263+
let Ok(sample) = sample else {
264+
return;
265+
};
266+
267+
let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix {
268+
prefix.join(&sample.key_expr()).unwrap().into()
269+
} else {
270+
sample.key_expr().clone()
271+
};
272+
273+
if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) {
274+
if queue.len() >= history {
275+
queue.pop_front();
282276
}
277+
queue.push_back(sample);
278+
} else if cache.len() >= limit {
279+
tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource",
280+
pub_key_expr);
281+
} else {
282+
let mut queue: VecDeque<Sample> = VecDeque::new();
283+
queue.push_back(sample);
284+
cache.insert(queryable_key_expr.into(), queue);
283285
}
284286
},
285287

286288
// on query, reply with cached content
287289
query = quer_recv.recv_async() => {
288-
if let Ok(query) = query {
289-
if !query.key_expr().as_str().contains('*') {
290-
if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
290+
let Ok(query) = query else {
291+
return
292+
};
293+
294+
if !query.key_expr().as_str().contains('*') {
295+
if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
296+
for sample in queue {
297+
if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
298+
if !time_range.contains(timestamp.get_time().to_system_time()){
299+
continue;
300+
}
301+
}
302+
if let Err(e) = query.reply_sample(sample.clone()).await {
303+
tracing::warn!("Error replying to query: {}", e);
304+
}
305+
}
306+
}
307+
} else {
308+
for (key_expr, queue) in cache.iter() {
309+
if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
291310
for sample in queue {
292311
if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
293312
if !time_range.contains(timestamp.get_time().to_system_time()){
@@ -299,21 +318,6 @@ impl PublicationCache {
299318
}
300319
}
301320
}
302-
} else {
303-
for (key_expr, queue) in cache.iter() {
304-
if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
305-
for sample in queue {
306-
if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
307-
if !time_range.contains(timestamp.get_time().to_system_time()){
308-
continue;
309-
}
310-
}
311-
if let Err(e) = query.reply_sample(sample.clone()).await {
312-
tracing::warn!("Error replying to query: {}", e);
313-
}
314-
}
315-
}
316-
}
317321
}
318322
}
319323
},

0 commit comments

Comments
 (0)