Skip to content

Commit 74af05a

Browse files
committed
[catpowder] Feature: batching
1 parent a46684c commit 74af05a

File tree

2 files changed

+41
-42
lines changed

2 files changed

+41
-42
lines changed

src/catnap/win/transport.rs

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ use crate::{
2525
socket::option::{SocketOption, TcpSocketOptions},
2626
transport::NetworkTransport,
2727
},
28-
poll_yield, DemiRuntime, SharedDemiRuntime, SharedObject,
28+
poll_yield,
29+
types::DEMI_SGARRAY_MAXLEN,
30+
DemiRuntime, SharedDemiRuntime, SharedObject,
2931
},
3032
};
33+
use ::arrayvec::ArrayVec;
3134
use ::futures::FutureExt;
32-
use std::{
35+
use ::std::{
3336
net::{SocketAddr, SocketAddrV4},
3437
pin::Pin,
3538
};
@@ -244,7 +247,7 @@ impl NetworkTransport for SharedCatnapTransport {
244247
&mut self,
245248
socket: &mut Self::SocketDescriptor,
246249
size: usize,
247-
) -> Result<(Option<SocketAddr>, DemiBuffer), Fail> {
250+
) -> Result<(Option<SocketAddr>, ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>), Fail> {
248251
let mut buf: DemiBuffer = DemiBuffer::new(size as u16);
249252
unsafe {
250253
self.0.iocp.do_io(
@@ -265,7 +268,9 @@ impl NetworkTransport for SharedCatnapTransport {
265268
} else {
266269
trace!("not data received");
267270
}
268-
Ok((sockaddr, buf))
271+
let mut result: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = ArrayVec::new();
272+
result.push(buf);
273+
Ok((sockaddr, result))
269274
})
270275
}
271276

@@ -275,40 +280,40 @@ impl NetworkTransport for SharedCatnapTransport {
275280
async fn push(
276281
&mut self,
277282
socket: &mut Self::SocketDescriptor,
278-
buf: &mut DemiBuffer,
283+
bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>,
279284
addr: Option<SocketAddr>,
280285
) -> Result<(), Fail> {
281-
loop {
282-
let result: Result<usize, Fail> = unsafe {
283-
self.0.iocp.do_io(
284-
SocketOpState::Push(buf.clone()),
285-
|state: Pin<&mut SocketOpState>, overlapped: *mut OVERLAPPED| -> Result<(), Fail> {
286-
socket.start_push(state, addr, overlapped)
287-
},
288-
|_: Pin<&mut SocketOpState>, result: OverlappedResult| -> Result<usize, Fail> {
289-
socket.finish_push(result)
286+
for mut buf in bufs {
287+
while !buf.is_empty() {
288+
let result: Result<usize, Fail> = unsafe {
289+
self.0.iocp.do_io(
290+
SocketOpState::Push(buf.clone()),
291+
|state: Pin<&mut SocketOpState>, overlapped: *mut OVERLAPPED| -> Result<(), Fail> {
292+
socket.start_push(state, addr, overlapped)
293+
},
294+
|_: Pin<&mut SocketOpState>, result: OverlappedResult| -> Result<usize, Fail> {
295+
socket.finish_push(result)
296+
},
297+
)
298+
}
299+
.await;
300+
301+
match result {
302+
Ok(nbytes) => {
303+
trace!("data pushed ({:?}/{:?} bytes)", nbytes, buf.len());
304+
buf.adjust(nbytes)?;
290305
},
291-
)
292-
}
293-
.await;
294-
295-
match result {
296-
Ok(nbytes) => {
297-
trace!("data pushed ({:?}/{:?} bytes)", nbytes, buf.len());
298-
buf.adjust(nbytes)?;
299-
if buf.is_empty() {
300-
return Ok(());
301-
}
302-
},
303306

304-
Err(fail) => {
305-
if !DemiRuntime::should_retry(fail.errno) {
306-
let message: String = format!("push failed: {}", fail.cause);
307-
return Err(Fail::new(fail.errno, message.as_str()));
308-
}
309-
},
307+
Err(fail) => {
308+
if !DemiRuntime::should_retry(fail.errno) {
309+
let message: String = format!("push failed: {}", fail.cause);
310+
return Err(Fail::new(fail.errno, message.as_str()));
311+
}
312+
},
313+
}
310314
}
311315
}
316+
Ok(())
312317
}
313318

314319
fn get_runtime(&self) -> &SharedDemiRuntime {

src/catpowder/win/runtime.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
},
1717
demikernel::config::Config,
1818
inetstack::{
19-
consts::MAX_HEADER_SIZE,
19+
consts::{MAX_BATCH_SIZE_NUM_PACKETS, MAX_HEADER_SIZE},
2020
protocols::{layer1::PhysicalLayer, layer4::ephemeral::EphemeralPorts},
2121
},
2222
runtime::{
@@ -29,12 +29,6 @@ use crate::{
2929
use arrayvec::ArrayVec;
3030
use std::{borrow::BorrowMut, num::NonZeroU32, rc::Rc};
3131

32-
//======================================================================================================================
33-
// Constants
34-
//======================================================================================================================
35-
36-
const MAX_BATCH_SIZE_NUM_PACKETS: usize = 16;
37-
3832
//======================================================================================================================
3933
// Structures
4034
//======================================================================================================================
@@ -124,7 +118,7 @@ impl SharedCatpowderRuntime {
124118
// Trait Implementations
125119
//======================================================================================================================
126120

127-
impl PhysicalLayer<MAX_BATCH_SIZE_NUM_PACKETS> for SharedCatpowderRuntime {
121+
impl PhysicalLayer for SharedCatpowderRuntime {
128122
/// Transmits a packet.
129123
fn transmit(&mut self, pkts: ArrayVec<DemiBuffer, MAX_BATCH_SIZE_NUM_PACKETS>) -> Result<(), Fail> {
130124
timer!("catpowder::win::runtime::transmit");
@@ -201,8 +195,8 @@ impl PhysicalLayer<MAX_BATCH_SIZE_NUM_PACKETS> for SharedCatpowderRuntime {
201195

202196
/// Memory runtime trait implementation for XDP Runtime.
203197
impl DemiMemoryAllocator for SharedCatpowderRuntime {
204-
fn get_max_seg_size_bytes(&self) -> usize {
205-
self.max_body_size
198+
fn get_max_buffer_size_bytes(&self) -> usize {
199+
self.0.max_body_size
206200
}
207201

208202
/// Allocates a scatter-gather array.

0 commit comments

Comments
 (0)