Skip to content

Commit fc32d02

Browse files
committed
API: Implement multi segment DemiBuffer push and pop
1 parent 103aa35 commit fc32d02

File tree

35 files changed

+641
-431
lines changed

35 files changed

+641
-431
lines changed

network_simulator/input/tcp/pop/pop-reordering.pkt

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
+.0 wait(500, ...) = 0
1818

1919
// Read data.
20-
+.1 read(501, ..., 1000) = 1000
20+
+.1 read(501, ..., 2000) = 2000
2121

2222
// Receive out of order data packet.
2323
+.1 TCP < P. seq 11001(1000) ack 12346 win 65535 <nop>
@@ -32,10 +32,4 @@
3232
+.0 wait(501, ...) = 0
3333

3434
// Send ACK packet
35-
+.6 TCP > . seq 12346(0) ack 12001 win 63535 <nop>
36-
37-
// Read data.
38-
+.1 read(501, ..., 1000) = 1000
39-
40-
// Data read.
41-
+.0 wait(501, ...) = 0
35+
+.6 TCP > . seq 12346(0) ack 12001 win 63535 <nop>

src/catnap/linux/active_socket.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,18 @@ impl ActiveSocketData {
161161
// Trim the buffer and leave for next read if we got more than expected.
162162
if let Ok(remainder) = incoming.split_back(bytes_read) {
163163
if !remainder.is_empty() {
164-
self.recv_queue.push_front(Ok((addr.clone(), remainder)));
164+
self.push_front(remainder, addr.clone());
165165
}
166166
}
167167

168168
Ok((addr, incoming))
169169
}
170170

171+
/// Puts data back into the socket queue.
172+
pub fn push_front(&mut self, buf: DemiBuffer, addr: Option<SocketAddr>) {
173+
self.recv_queue.push_front(Ok((addr, buf)));
174+
}
175+
171176
pub fn get_socket(&self) -> &Socket {
172177
&self.socket
173178
}

src/catnap/linux/socket.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,15 @@ impl SharedSocketData {
123123
}
124124
}
125125

126+
/// Put some data back into an active socket.
127+
pub fn push_front(&mut self, buf: DemiBuffer, addr: Option<SocketAddr>) {
128+
match self.deref_mut() {
129+
SocketData::Inactive(_) => unreachable!("Cannot read on an inactive socket"),
130+
SocketData::Active(data) => data.push_front(buf, addr),
131+
SocketData::Passive(_) => unreachable!("Cannot read on a passive socket"),
132+
}
133+
}
134+
126135
/// Handle incoming data event.
127136
pub fn poll_in(&mut self) {
128137
match self.deref_mut() {

src/catnap/linux/transport.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,20 @@ mod socket;
1616
use crate::{
1717
catnap::transport::socket::{SharedSocketData, SocketData},
1818
demikernel::config::Config,
19-
expect_ok, expect_some,
19+
expect_some,
2020
runtime::{
2121
fail::Fail,
2222
memory::{DemiBuffer, DemiMemoryAllocator},
2323
network::{
2424
socket::option::{SocketOption, TcpSocketOptions},
2525
transport::NetworkTransport,
2626
},
27-
poll_yield, DemiRuntime, SharedDemiRuntime, SharedObject,
27+
poll_yield,
28+
types::DEMI_SGARRAY_MAXLEN,
29+
DemiRuntime, SharedDemiRuntime, SharedObject,
2830
},
2931
};
32+
use ::arrayvec::ArrayVec;
3033
use ::futures::FutureExt;
3134
use ::slab::Slab;
3235
use ::socket2::{Domain, Protocol, Socket, Type};
@@ -558,12 +561,12 @@ impl NetworkTransport for SharedCatnapTransport {
558561
async fn push(
559562
&mut self,
560563
sd: &mut Self::SocketDescriptor,
561-
buf: &mut DemiBuffer,
564+
bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>,
562565
addr: Option<SocketAddr>,
563566
) -> Result<(), Fail> {
564-
self.data_from_sd(sd).push(addr, buf.clone()).await?;
565-
// Clear out the original buffer.
566-
expect_ok!(buf.trim(buf.len()), "Should be able to empty the buffer");
567+
for buf in bufs {
568+
self.data_from_sd(sd).push(addr, buf).await?;
569+
}
567570
Ok(())
568571
}
569572

@@ -574,8 +577,23 @@ impl NetworkTransport for SharedCatnapTransport {
574577
&mut self,
575578
sd: &mut Self::SocketDescriptor,
576579
size: usize,
577-
) -> Result<(Option<SocketAddr>, DemiBuffer), Fail> {
578-
self.data_from_sd(sd).pop(size).await
580+
) -> Result<(Option<SocketAddr>, ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>), Fail> {
581+
let total_size: usize = 0;
582+
let mut bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = ArrayVec::new();
583+
let mut addr: Option<SocketAddr> = None;
584+
while total_size < size {
585+
let (src_addr, buf) = self.data_from_sd(sd).pop(size - total_size).await?;
586+
match src_addr {
587+
src_addr if src_addr == addr => (),
588+
src_addr if addr.is_none() => addr = src_addr,
589+
src_addr => {
590+
self.data_from_sd(sd).push_front(buf, src_addr);
591+
return Ok((addr, bufs));
592+
},
593+
}
594+
bufs.push(buf)
595+
}
596+
Ok((addr, bufs))
579597
}
580598

581599
/// Close the socket on the underlying transport. Also unregisters the socket with epoll.

src/catnap/win/transport.rs

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ use crate::{
2525
socket::option::{SocketOption, TcpSocketOptions},
2626
transport::NetworkTransport,
2727
},
28-
poll_yield, DemiRuntime, SharedDemiRuntime, SharedObject,
28+
poll_yield,
29+
types::DEMI_SGARRAY_MAXLEN,
30+
DemiRuntime, SharedDemiRuntime, SharedObject,
2931
},
3032
};
33+
use ::arrayvec::ArrayVec;
3134
use ::futures::FutureExt;
32-
use std::{
35+
use ::std::{
3336
net::{SocketAddr, SocketAddrV4},
3437
pin::Pin,
3538
};
@@ -244,7 +247,7 @@ impl NetworkTransport for SharedCatnapTransport {
244247
&mut self,
245248
socket: &mut Self::SocketDescriptor,
246249
size: usize,
247-
) -> Result<(Option<SocketAddr>, DemiBuffer), Fail> {
250+
) -> Result<(Option<SocketAddr>, ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>), Fail> {
248251
let mut buf: DemiBuffer = DemiBuffer::new(size as u16);
249252
unsafe {
250253
self.0.iocp.do_io(
@@ -265,7 +268,9 @@ impl NetworkTransport for SharedCatnapTransport {
265268
} else {
266269
trace!("not data received");
267270
}
268-
Ok((sockaddr, buf))
271+
let mut result: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = ArrayVec::new();
272+
result.push(buf);
273+
Ok((sockaddr, result))
269274
})
270275
}
271276

@@ -275,40 +280,40 @@ impl NetworkTransport for SharedCatnapTransport {
275280
async fn push(
276281
&mut self,
277282
socket: &mut Self::SocketDescriptor,
278-
buf: &mut DemiBuffer,
283+
bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>,
279284
addr: Option<SocketAddr>,
280285
) -> Result<(), Fail> {
281-
loop {
282-
let result: Result<usize, Fail> = unsafe {
283-
self.0.iocp.do_io(
284-
SocketOpState::Push(buf.clone()),
285-
|state: Pin<&mut SocketOpState>, overlapped: *mut OVERLAPPED| -> Result<(), Fail> {
286-
socket.start_push(state, addr, overlapped)
287-
},
288-
|_: Pin<&mut SocketOpState>, result: OverlappedResult| -> Result<usize, Fail> {
289-
socket.finish_push(result)
286+
for mut buf in bufs {
287+
while !buf.is_empty() {
288+
let result: Result<usize, Fail> = unsafe {
289+
self.0.iocp.do_io(
290+
SocketOpState::Push(buf.clone()),
291+
|state: Pin<&mut SocketOpState>, overlapped: *mut OVERLAPPED| -> Result<(), Fail> {
292+
socket.start_push(state, addr, overlapped)
293+
},
294+
|_: Pin<&mut SocketOpState>, result: OverlappedResult| -> Result<usize, Fail> {
295+
socket.finish_push(result)
296+
},
297+
)
298+
}
299+
.await;
300+
301+
match result {
302+
Ok(nbytes) => {
303+
trace!("data pushed ({:?}/{:?} bytes)", nbytes, buf.len());
304+
buf.adjust(nbytes)?;
290305
},
291-
)
292-
}
293-
.await;
294-
295-
match result {
296-
Ok(nbytes) => {
297-
trace!("data pushed ({:?}/{:?} bytes)", nbytes, buf.len());
298-
buf.adjust(nbytes)?;
299-
if buf.is_empty() {
300-
return Ok(());
301-
}
302-
},
303306

304-
Err(fail) => {
305-
if !DemiRuntime::should_retry(fail.errno) {
306-
let message: String = format!("push failed: {}", fail.cause);
307-
return Err(Fail::new(fail.errno, message.as_str()));
308-
}
309-
},
307+
Err(fail) => {
308+
if !DemiRuntime::should_retry(fail.errno) {
309+
let message: String = format!("push failed: {}", fail.cause);
310+
return Err(Fail::new(fail.errno, message.as_str()));
311+
}
312+
},
313+
}
310314
}
311315
}
316+
Ok(())
312317
}
313318

314319
fn get_runtime(&self) -> &SharedDemiRuntime {

src/catnip/runtime/mod.rs

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,7 @@ use crate::{
1818
mempool::MemoryPool,
1919
},
2020
demikernel::config::Config,
21-
expect_some,
22-
inetstack::{
23-
consts::{MAX_HEADER_SIZE, RECEIVE_BATCH_SIZE},
24-
protocols::layer1::PhysicalLayer,
25-
},
21+
inetstack::{consts::MAX_BATCH_SIZE_NUM_PACKETS, protocols::layer1::PhysicalLayer},
2622
runtime::{
2723
fail::Fail,
2824
libdpdk::{
@@ -320,43 +316,48 @@ impl DerefMut for SharedDPDKRuntime {
320316
}
321317

322318
impl PhysicalLayer for SharedDPDKRuntime {
323-
fn transmit(&mut self, pkt: DemiBuffer) -> Result<(), Fail> {
319+
fn transmit(&mut self, pkts: ArrayVec<DemiBuffer, MAX_BATCH_SIZE_NUM_PACKETS>) -> Result<(), Fail> {
324320
timer!("catnip::runtime::transmit");
321+
325322
// Grab the packet and copy it if necessary. In general, this copy will happen for small packets without
326323
// payloads because we allocate actual data-carrying application buffers from the DPDK pool.
327-
let outgoing_pkt: DemiBuffer = match pkt {
328-
buf if buf.is_dpdk_allocated() => buf,
329-
buf if buf.len() <= self.max_body_size => {
330-
let mut mbuf: DemiBuffer = self.dpdk_allocate_mbuf(buf.len())?;
331-
debug_assert_eq!(buf.len(), mbuf.len());
332-
mbuf.copy_from_slice(&buf);
333-
334-
mbuf
335-
},
336-
buf => {
337-
let cause: String = format!(
338-
"Cannot allocate a DPDK buffer that is large enough. Max size={:?} request size={:?}",
339-
self.max_body_size,
340-
buf.len()
341-
);
342-
warn!("{}", cause);
343-
return Err(Fail::new(libc::EINVAL, &cause));
344-
},
345-
};
324+
let len: usize = pkts.len();
325+
let mut mbufs: [*mut rte_mbuf; MAX_BATCH_SIZE_NUM_PACKETS] = unsafe { mem::zeroed() };
326+
for (i, pkt) in pkts.into_iter().enumerate() {
327+
mbufs[i] = match pkt {
328+
buf if buf.is_dpdk_allocated() => buf
329+
.into_mbuf()
330+
.ok_or(Fail::new(libc::EINVAL, "should be able to convert into mbuf"))?,
331+
buf if buf.len() <= self.max_body_size => {
332+
let mut mbuf: DemiBuffer = self.dpdk_allocate_mbuf(buf.len())?;
333+
debug_assert_eq!(buf.len(), mbuf.len());
334+
mbuf.copy_from_slice(&buf);
335+
336+
mbuf.into_mbuf()
337+
.ok_or(Fail::new(libc::EINVAL, "should be able to convert into mbuf"))?
338+
},
339+
_ => {
340+
return Err(Fail::new(
341+
libc::EINVAL,
342+
"cannot allocate DPDK buffer that is big enough",
343+
))
344+
},
345+
};
346+
}
346347

347-
let mut mbuf_ptr: *mut rte_mbuf = expect_some!(outgoing_pkt.into_mbuf(), "mbuf cannot be empty");
348-
let num_sent: u16 = unsafe { rte_eth_tx_burst(self.port_id, 0, &mut mbuf_ptr, 1) };
348+
let num_sent: u16 = unsafe { rte_eth_tx_burst(self.port_id, 0, mbufs.as_mut_ptr(), len as u16) };
349349
debug_assert_eq!(num_sent, 1);
350350
Ok(())
351351
}
352352

353-
fn receive(&mut self) -> Result<ArrayVec<DemiBuffer, RECEIVE_BATCH_SIZE>, Fail> {
353+
fn receive(&mut self) -> Result<ArrayVec<DemiBuffer, MAX_BATCH_SIZE_NUM_PACKETS>, Fail> {
354354
timer!("catnip::runtime::receive");
355355

356356
let mut out = ArrayVec::new();
357-
let mut packets: [*mut rte_mbuf; RECEIVE_BATCH_SIZE] = unsafe { mem::zeroed() };
358-
let nb_rx = unsafe { rte_eth_rx_burst(self.port_id, 0, packets.as_mut_ptr(), RECEIVE_BATCH_SIZE as u16) };
359-
assert!(nb_rx as usize <= RECEIVE_BATCH_SIZE);
357+
let mut packets: [*mut rte_mbuf; MAX_BATCH_SIZE_NUM_PACKETS] = unsafe { mem::zeroed() };
358+
let nb_rx =
359+
unsafe { rte_eth_rx_burst(self.port_id, 0, packets.as_mut_ptr(), MAX_BATCH_SIZE_NUM_PACKETS as u16) };
360+
assert!(nb_rx as usize <= MAX_BATCH_SIZE_NUM_PACKETS);
360361

361362
{
362363
for &packet in &packets[..nb_rx as usize] {
@@ -371,13 +372,12 @@ impl PhysicalLayer for SharedDPDKRuntime {
371372
}
372373

373374
impl DemiMemoryAllocator for SharedDPDKRuntime {
375+
fn get_max_buffer_size_bytes(&self) -> usize {
376+
self.max_body_size
377+
}
378+
374379
fn allocate_demi_buffer(&self, size: usize) -> Result<DemiBuffer, Fail> {
375-
// First allocate the underlying DemiBuffer.
376-
if size <= self.max_body_size {
377-
self.dpdk_allocate_mbuf(size)
378-
} else {
379-
// Allocate a heap-managed buffer.
380-
Ok(DemiBuffer::new_with_headroom(size as u16, MAX_HEADER_SIZE as u16))
381-
}
380+
debug_assert!(size < self.max_body_size);
381+
self.dpdk_allocate_mbuf(size)
382382
}
383383
}

0 commit comments

Comments
 (0)