From 52c6103d9d414e5a54212b44e07c321671c15636 Mon Sep 17 00:00:00 2001 From: iyzhang Date: Fri, 27 Jun 2025 11:03:12 -0700 Subject: [PATCH] API: adding multi-segment sgas --- examples/c/tcp-ping-pong.c | 14 +- examples/c/tcp-push-pop.c | 17 ++- examples/c/udp-ping-pong.c | 8 +- examples/c/udp-push-pop.c | 8 +- examples/rust/tcp-dump.rs | 2 +- examples/rust/tcp-ping-pong.rs | 12 +- examples/rust/tcp-pktgen.rs | 10 +- examples/rust/tcp-push-pop.rs | 10 +- examples/rust/udp-dump.rs | 2 +- examples/rust/udp-echo.rs | 25 +-- examples/rust/udp-ping-pong.rs | 16 +- examples/rust/udp-pktgen.rs | 10 +- examples/rust/udp-push-pop.rs | 12 +- examples/rust/udp-relay.rs | 2 +- examples/tcp-close/client.rs | 4 +- examples/tcp-close/server.rs | 2 +- examples/tcp-echo/client.rs | 102 +++++++------ examples/tcp-echo/server.rs | 18 ++- examples/tcp-wait/client.rs | 18 +-- include/demi/types.h | 15 +- man/demi_sgaalloc.md | 6 +- .../input/tcp/pop/pop-reordering.pkt | 10 +- src/catnap/linux/active_socket.rs | 19 ++- src/catnap/linux/socket.rs | 18 ++- src/catnap/linux/transport.rs | 46 ++++-- src/catnap/win/transport.rs | 69 +++++---- src/catnip/runtime/mod.rs | 76 +++++----- src/catpowder/linux/mod.rs | 61 +++++--- src/catpowder/win/runtime.rs | 50 +++--- src/demikernel/bindings.rs | 14 +- src/demikernel/libos/network/libos.rs | 45 ++++-- src/demikernel/libos/network/queue.rs | 21 ++- src/inetstack/consts.rs | 10 +- src/inetstack/mod.rs | 22 +-- src/inetstack/protocols/layer1/mod.rs | 24 +-- src/inetstack/protocols/layer2/mod.rs | 22 ++- src/inetstack/protocols/layer3/mod.rs | 6 +- src/inetstack/protocols/layer4/mod.rs | 33 +++- .../protocols/layer4/tcp/established/mod.rs | 32 +++- .../layer4/tcp/established/receiver.rs | 72 +++++---- .../layer4/tcp/established/sender.rs | 42 +++--- src/inetstack/protocols/layer4/tcp/peer.rs | 16 +- src/inetstack/protocols/layer4/tcp/socket.rs | 9 +- src/inetstack/protocols/layer4/udp/peer.rs | 6 +- src/inetstack/protocols/layer4/udp/tests.rs | 80 +++++----- src/inetstack/test_helpers/engine.rs | 16 +- src/inetstack/test_helpers/physical_layer.rs | 30 ++-- src/runtime/memory/mod.rs | 142 ++++++++++-------- src/runtime/network/transport.rs | 6 +- src/runtime/queue/operation_result.rs | 6 +- src/runtime/types/memory.rs | 68 ++++++--- src/runtime/types/ops.rs | 24 +-- tests/c/sizes.c | 12 +- tests/c/syscalls.c | 13 +- tests/rust/common/libos.rs | 6 +- tests/rust/common/runtime.rs | 30 ++-- tests/rust/tcp.rs | 26 ++-- tests/rust/udp.rs | 8 +- 58 files changed, 877 insertions(+), 626 deletions(-) diff --git a/examples/c/tcp-ping-pong.c b/examples/c/tcp-ping-pong.c index 42501314f..2dd5b29f3 100644 --- a/examples/c/tcp-ping-pong.c +++ b/examples/c/tcp-ping-pong.c @@ -138,7 +138,7 @@ static void pop_wait(int qd, demi_qresult_t *qr) /* Parse operation result. */ assert(qr->qr_opcode == DEMI_OPC_POP); - assert(qr->qr_value.sga.sga_segs != 0); + assert(qr->qr_value.sga.segments != 0); } /*====================================================================================================================* @@ -189,7 +189,7 @@ static void server(int argc, char *const argv[], struct sockaddr_in *local, size /* Extract received scatter-gather array. */ memcpy(&sga, &qr.qr_value.sga, sizeof(demi_sgarray_t)); - nbytes += sga.sga_segs[0].sgaseg_len; + nbytes += sga.segments[0].data_len_bytes; /* Push scatter-gather array. */ push_wait(qd, &sga, &qr); @@ -242,10 +242,10 @@ static void client(int argc, char *const argv[], const struct sockaddr_in *remot /* Allocate scatter-gather array. */ sga = demi_sgaalloc(data_size); - assert(sga.sga_segs != 0); + assert(sga.segments != 0); /* Prepare data. */ - memset(sga.sga_segs[0].sgaseg_buf, 1, data_size); + memset(sga.segments[0].data_buf_ptr, 1, data_size); /* Push scatter-gather array. */ push_wait(sockqd, &sga, &qr); @@ -258,9 +258,9 @@ static void client(int argc, char *const argv[], const struct sockaddr_in *remot pop_wait(sockqd, &qr); /* Check payload. */ - for (uint32_t i = 0; i < qr.qr_value.sga.sga_segs[0].sgaseg_len; i++) - assert(((char *)qr.qr_value.sga.sga_segs[0].sgaseg_buf)[i] == 1); - nbytes += qr.qr_value.sga.sga_segs[0].sgaseg_len; + for (uint32_t i = 0; i < qr.qr_value.sga.segments[0].data_len_bytes; i++) + assert(((char *)qr.qr_value.sga.segments[0].data_buf_ptr)[i] == 1); + nbytes += qr.qr_value.sga.segments[0].data_len_bytes; /* Release received scatter-gather array. */ assert(demi_sgafree(&qr.qr_value.sga) == 0); diff --git a/examples/c/tcp-push-pop.c b/examples/c/tcp-push-pop.c index 40de9c2d4..f4b657a69 100644 --- a/examples/c/tcp-push-pop.c +++ b/examples/c/tcp-push-pop.c @@ -52,8 +52,8 @@ static int pop_get_received_nbytes(int sockqd) assert(demi_pop(&tok, sockqd) == 0); assert(demi_wait(&res, tok, NULL) == 0); assert(res.qr_opcode == DEMI_OPC_POP); - assert(res.qr_value.sga.sga_segs != 0); - recv_bytes = res.qr_value.sga.sga_segs[0].sgaseg_len; + assert(res.qr_value.sga.segments != 0); + recv_bytes = res.qr_value.sga.segments[0].data_len_bytes; assert(demi_sgafree(&res.qr_value.sga) == 0); return recv_bytes; } @@ -91,14 +91,14 @@ static int push_get_sent_nbytes(const int sockqd, size_t data_size) demi_sgarray_t sga = demi_sgaalloc(data_size); int sent_bytes = 0; - assert(sga.sga_segs != 0); - memset(sga.sga_segs[0].sgaseg_buf, 1, data_size); + assert(sga.segments != 0); + memset(sga.segments[0].data_buf_ptr, 1, data_size); // ToDo: demi_pushto() also must work for TCP on all LibOSes. // FIXME: https://github.com/microsoft/demikernel/issues/137 assert(demi_push(&tok, sockqd, &sga) == 0); assert(demi_wait(&res, tok, NULL) == 0); assert(res.qr_opcode == DEMI_OPC_PUSH); - sent_bytes = sga.sga_segs[0].sgaseg_len; + sent_bytes = sga.segments[0].data_len_bytes; assert(demi_sgafree(&sga) == 0); return sent_bytes; } @@ -161,9 +161,12 @@ int main(int argc, char *const argv[]) }; assert(demi_init(&args) == 0); - if (!strcmp(argv[1], "--server")) { + if (!strcmp(argv[1], "--server")) + { run_server(&addr, data_size, max_msgs); - } else if (!strcmp(argv[1], "--client")) { + } + else if (!strcmp(argv[1], "--client")) + { run_client(&addr, data_size, max_msgs); } diff --git a/examples/c/udp-ping-pong.c b/examples/c/udp-ping-pong.c index b98b819e2..e790ff1f0 100644 --- a/examples/c/udp-ping-pong.c +++ b/examples/c/udp-ping-pong.c @@ -86,7 +86,7 @@ static void pop_wait(int qd, demi_qresult_t *qr) /* Parse operation result. */ assert(qr->qr_opcode == DEMI_OPC_POP); - assert(qr->qr_value.sga.sga_segs != 0); + assert(qr->qr_value.sga.segments != 0); } /*====================================================================================================================* @@ -195,11 +195,11 @@ static void client(int argc, /* Allocate scatter-gather array. */ sga = demi_sgaalloc(data_size); - assert(sga.sga_segs != 0); + assert(sga.segments != 0); /* Prepare data. */ memset(expected_buf, it % 256, data_size); - memcpy(sga.sga_segs[0].sgaseg_buf, expected_buf, data_size); + memcpy(sga.segments[0].data_buf_ptr, expected_buf, data_size); /* Push scatter-gather array. */ pushto_wait(sockqd, &sga, &qr, (const struct sockaddr *)remote); @@ -211,7 +211,7 @@ static void client(int argc, pop_wait(sockqd, &qr); /* Parse operation result. */ - assert(!memcmp(qr.qr_value.sga.sga_segs[0].sgaseg_buf, expected_buf, data_size)); + assert(!memcmp(qr.qr_value.sga.segments[0].data_buf_ptr, expected_buf, data_size)); /* Release received scatter-gather array. */ assert(demi_sgafree(&qr.qr_value.sga) == 0); diff --git a/examples/c/udp-push-pop.c b/examples/c/udp-push-pop.c index 55fa1d612..97ebf2b93 100644 --- a/examples/c/udp-push-pop.c +++ b/examples/c/udp-push-pop.c @@ -95,8 +95,8 @@ static void server(int argc, /* Parse operation result. */ assert(qr.qr_opcode == DEMI_OPC_POP); - assert(qr.qr_value.sga.sga_segs != 0); - assert(!memcmp(qr.qr_value.sga.sga_segs[0].sgaseg_buf, expected_buf, data_size)); + assert(qr.qr_value.sga.segments != 0); + assert(!memcmp(qr.qr_value.sga.segments[0].data_buf_ptr, expected_buf, data_size)); /* Release scatter-gather array. */ assert(demi_sgafree(&qr.qr_value.sga) == 0); @@ -149,10 +149,10 @@ static void client(int argc, /* Allocate scatter-gather array. */ sga = demi_sgaalloc(data_size); - assert(sga.sga_segs != 0); + assert(sga.segments != 0); /* Prepare data. */ - memset(sga.sga_segs[0].sgaseg_buf, 1, data_size); + memset(sga.segments[0].data_buf_ptr, 1, data_size); /* Push data. */ assert(demi_pushto(&qt, sockqd, &sga, (const struct sockaddr *)remote, sizeof(struct sockaddr_in)) == 0); diff --git a/examples/rust/tcp-dump.rs b/examples/rust/tcp-dump.rs index 89d8e5581..5002ffdb7 100644 --- a/examples/rust/tcp-dump.rs +++ b/examples/rust/tcp-dump.rs @@ -176,7 +176,7 @@ impl Application { let sockqd: QDesc = qr.qr_qd.into(); let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - num_bytes += sga.sga_segs[0].sgaseg_len as usize; + num_bytes += sga.segments[0].data_len_bytes as usize; if let Err(e) = self.libos.sgafree(sga) { println!("ERROR: sgafree() failed (error={:?})", e); diff --git a/examples/rust/tcp-ping-pong.rs b/examples/rust/tcp-ping-pong.rs index 7c2415f94..4fbc80724 100644 --- a/examples/rust/tcp-ping-pong.rs +++ b/examples/rust/tcp-ping-pong.rs @@ -39,9 +39,9 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result { }; // Ensure that allocated the array has the requested size. - if sga.sga_segs[0].sgaseg_len as usize != size { + if sga.segments[0].data_len_bytes as usize != size { freesga(libos, sga); - let seglen: usize = sga.sga_segs[0].sgaseg_len as usize; + let seglen: usize = sga.segments[0].data_len_bytes as usize; anyhow::bail!( "failed to allocate scatter-gather array: expected size={:?} allocated size={:?}", size, @@ -50,8 +50,8 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result { } // Fill in the array. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; let mut fill: u8 = value; for x in slice { @@ -124,8 +124,8 @@ fn pop_and_wait(libos: &mut LibOS, sockqd: QDesc, recvbuf: &mut [u8]) -> Result< }; // Copy data. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; for x in slice { recvbuf[index] = *x; diff --git a/examples/rust/tcp-pktgen.rs b/examples/rust/tcp-pktgen.rs index 06383b168..95fa934f0 100644 --- a/examples/rust/tcp-pktgen.rs +++ b/examples/rust/tcp-pktgen.rs @@ -239,7 +239,7 @@ impl Application { }, }; - num_bytes += sga.sga_segs[0].sgaseg_len as usize; + num_bytes += sga.segments[0].data_len_bytes as usize; if let Err(e) = self.libos.sgafree(sga) { println!("ERROR: sgafree() failed (error={:?})", e); println!("WARN: leaking sga"); @@ -257,12 +257,12 @@ impl Application { }; // Ensure that allocated array has the requested size. - if sga.sga_segs[0].sgaseg_len as usize != size { + if sga.segments[0].data_len_bytes as usize != size { if let Err(e) = self.libos.sgafree(sga) { println!("ERROR: sgafree() failed (error={:?})", e); println!("WARN: leaking sga"); } - let seglen: usize = sga.sga_segs[0].sgaseg_len as usize; + let seglen: usize = sga.segments[0].data_len_bytes as usize; anyhow::bail!( "failed to allocate scatter-gather array: expected size={:?} allocated size={:?}", size, @@ -271,8 +271,8 @@ impl Application { } // Fill in the array. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; slice.fill(value); diff --git a/examples/rust/tcp-push-pop.rs b/examples/rust/tcp-push-pop.rs index b54bd061a..9f63b33aa 100644 --- a/examples/rust/tcp-push-pop.rs +++ b/examples/rust/tcp-push-pop.rs @@ -40,11 +40,11 @@ fn mksga(libos: &mut LibOS, value: u8) -> Result { }; // Create pointer for filling the array. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; // Ensure that allocated array has the requested size. - if sga.sga_segs[0].sgaseg_len as usize != BUF_SIZE_BYTES || ptr.is_null() { + if sga.segments[0].data_len_bytes as usize != BUF_SIZE_BYTES || ptr.is_null() { freesga(libos, sga); - let seglen: usize = sga.sga_segs[0].sgaseg_len as usize; + let seglen: usize = sga.segments[0].data_len_bytes as usize; anyhow::bail!( "failed to allocate scatter-gather array: expected size={:?} allocated size={:?}", BUF_SIZE_BYTES, @@ -119,8 +119,8 @@ impl TcpServer { }; // Sanity check received data. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let bytes: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let bytes: usize = sga.segments[0].data_len_bytes as usize; debug_assert_eq!(bytes, BUF_SIZE_BYTES); debug_assert!(ptr.is_aligned()); debug_assert_eq!(ptr.is_null(), false); diff --git a/examples/rust/udp-dump.rs b/examples/rust/udp-dump.rs index 6dd0cfa31..e1f30db5f 100644 --- a/examples/rust/udp-dump.rs +++ b/examples/rust/udp-dump.rs @@ -122,7 +122,7 @@ impl Application { match self.libos.wait(qt, None) { Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_POP => { let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - num_bytes += sga.sga_segs[0].sgaseg_len as usize; + num_bytes += sga.segments[0].data_len_bytes as usize; if let Err(e) = self.libos.sgafree(sga) { println!("ERROR: sgafree() failed (error={:?})", e); println!("WARN: leaking sga"); diff --git a/examples/rust/udp-echo.rs b/examples/rust/udp-echo.rs index e99965d11..908531ce5 100644 --- a/examples/rust/udp-echo.rs +++ b/examples/rust/udp-echo.rs @@ -148,18 +148,19 @@ impl Application { demi_opcode_t::DEMI_OPC_POP => { let sockqd: QDesc = qr.qr_qd.into(); let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - let saddr: SocketAddr = match Self::sockaddr_to_socketaddrv4(&unsafe { qr.qr_value.sga.sga_addr }) { - Ok(saddr) => SocketAddr::from(saddr), - Err(e) => { - // If error, free scatter-gather array. - if let Err(e) = self.libos.sgafree(sga) { - println!("ERROR: sgafree() failed (error={:?})", e); - println!("WARN: leaking sga"); - }; - anyhow::bail!("could not parse sockaddr: {}", e) - }, - }; - num_bytes += sga.sga_segs[0].sgaseg_len as usize; + let saddr: SocketAddr = + match Self::sockaddr_to_socketaddrv4(&unsafe { qr.qr_value.sga.sockaddr_src }) { + Ok(saddr) => SocketAddr::from(saddr), + Err(e) => { + // If error, free scatter-gather array. + if let Err(e) = self.libos.sgafree(sga) { + println!("ERROR: sgafree() failed (error={:?})", e); + println!("WARN: leaking sga"); + }; + anyhow::bail!("could not parse sockaddr: {}", e) + }, + }; + num_bytes += sga.segments[0].data_len_bytes as usize; // Push packet back. let qt: QToken = match self.libos.pushto(sockqd, &sga, saddr) { Ok(qt) => qt, diff --git a/examples/rust/udp-ping-pong.rs b/examples/rust/udp-ping-pong.rs index fca0ef88c..6f09b4cbe 100644 --- a/examples/rust/udp-ping-pong.rs +++ b/examples/rust/udp-ping-pong.rs @@ -38,9 +38,9 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result { }; // Ensure that allocated array has the requested size. - if sga.sga_segs[0].sgaseg_len as usize != size { + if sga.segments[0].data_len_bytes as usize != size { freesga(libos, sga); - let seglen: usize = sga.sga_segs[0].sgaseg_len as usize; + let seglen: usize = sga.segments[0].data_len_bytes as usize; anyhow::bail!( "failed to allocate scatter-gather array: expected size={:?} allocated size={:?}", size, @@ -48,8 +48,8 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result { ); } // Fill in the array. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; slice.fill(value); @@ -119,8 +119,8 @@ impl UdpServer { }; // Sanity check received data. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; for x in slice { if *x != fill_char { @@ -197,8 +197,8 @@ impl UdpClient { // Free the sent sga. self.libos.sgafree(sga)?; // Sanity check received data. - let ptr: *mut u8 = returned_sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = returned_sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = returned_sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = returned_sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; for x in slice { if *x != fill_char { diff --git a/examples/rust/udp-pktgen.rs b/examples/rust/udp-pktgen.rs index 7afb48187..9d5c8a1c2 100644 --- a/examples/rust/udp-pktgen.rs +++ b/examples/rust/udp-pktgen.rs @@ -247,7 +247,7 @@ impl Application { }, }; - num_bytes += sga.sga_segs[0].sgaseg_len as usize; + num_bytes += sga.segments[0].data_len_bytes as usize; if let Err(e) = self.libos.sgafree(sga) { println!("ERROR: sgafree() failed (error={:?})", e); @@ -266,12 +266,12 @@ impl Application { }; // Ensure that allocated array has the requested size. - if sga.sga_segs[0].sgaseg_len as usize != size { + if sga.segments[0].data_len_bytes as usize != size { if let Err(e) = self.libos.sgafree(sga) { println!("ERROR: sgafree() failed (error={:?})", e); println!("WARN: leaking sga"); }; - let seglen: usize = sga.sga_segs[0].sgaseg_len as usize; + let seglen: usize = sga.segments[0].data_len_bytes as usize; anyhow::bail!( "failed to allocate scatter-gather array: expected size={:?} allocated size={:?}", size, @@ -279,8 +279,8 @@ impl Application { ); } // Fill in the array. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; slice.fill(value); diff --git a/examples/rust/udp-push-pop.rs b/examples/rust/udp-push-pop.rs index f4e4ce4d7..4c6e069dd 100644 --- a/examples/rust/udp-push-pop.rs +++ b/examples/rust/udp-push-pop.rs @@ -38,9 +38,9 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result { }; // Ensure that allocated array has the requested size. - if sga.sga_segs[0].sgaseg_len as usize != size { + if sga.segments[0].data_len_bytes as usize != size { freesga(libos, sga); - let seglen: usize = sga.sga_segs[0].sgaseg_len as usize; + let seglen: usize = sga.segments[0].data_len_bytes as usize; anyhow::bail!( "failed to allocate scatter-gather array: expected size={:?} allocated size={:?}", size, @@ -49,8 +49,8 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result { } // Fill in the array. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; slice.fill(value); @@ -112,8 +112,8 @@ impl UdpServer { if let Some(sga) = self.sga { // Sanity check received data. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; for x in slice { demikernel::ensure_eq!(*x, fill_char); diff --git a/examples/rust/udp-relay.rs b/examples/rust/udp-relay.rs index 73a6b8217..20d0f7f10 100644 --- a/examples/rust/udp-relay.rs +++ b/examples/rust/udp-relay.rs @@ -166,7 +166,7 @@ impl Application { demi_opcode_t::DEMI_OPC_POP => { let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - num_bytes += sga.sga_segs[0].sgaseg_len as usize; + num_bytes += sga.segments[0].data_len_bytes as usize; let qt: QToken = match self.libos.pushto(self.sockqd, &sga, self.remote_socket_addr) { Ok(qt) => qt, diff --git a/examples/tcp-close/client.rs b/examples/tcp-close/client.rs index ca934c684..d5ca357a2 100644 --- a/examples/tcp-close/client.rs +++ b/examples/tcp-close/client.rs @@ -148,7 +148,7 @@ impl TcpClient { match pop_qr.qr_opcode { demi_opcode_t::DEMI_OPC_POP => { let sga: demi_sgarray_t = unsafe { pop_qr.qr_value.sga }; - let received_len: u32 = sga.sga_segs[0].sgaseg_len; + let received_len: u32 = sga.segments[0].data_len_bytes; self.libos.sgafree(sga)?; // 0 len pop represents socket closed from other side. demikernel::ensure_eq!( @@ -216,7 +216,7 @@ impl TcpClient { }, demi_opcode_t::DEMI_OPC_POP => { let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - let received_len: u32 = sga.sga_segs[0].sgaseg_len; + let received_len: u32 = sga.segments[0].data_len_bytes; self.libos.sgafree(sga)?; // 0 len pop represents socket closed from other side. diff --git a/examples/tcp-close/server.rs b/examples/tcp-close/server.rs index cdec8a6d1..8a8837540 100644 --- a/examples/tcp-close/server.rs +++ b/examples/tcp-close/server.rs @@ -113,7 +113,7 @@ impl TcpServer { demi_opcode_t::DEMI_OPC_POP => { let qd: QDesc = qr.qr_qd.into(); let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - let seglen: usize = sga.sga_segs[0].sgaseg_len as usize; + let seglen: usize = sga.segments[0].data_len_bytes as usize; // Ensure that client has closed the connection. assert_eq!(seglen, 0, "client must have had closed the connection, but it has not"); diff --git a/examples/tcp-echo/client.rs b/examples/tcp-echo/client.rs index cc0dc4831..000045195 100644 --- a/examples/tcp-echo/client.rs +++ b/examples/tcp-echo/client.rs @@ -235,8 +235,8 @@ impl TcpEchoClient { fn mksga(&mut self, size: usize) -> Result { debug_assert!(size > std::mem::size_of::()); let sga: demi_sgarray_t = self.libos.sgaalloc(size)?; - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; let now: u64 = Instant::now().duration_since(self.start).as_nanos() as u64; slice[0..8].copy_from_slice(&now.to_le_bytes()); @@ -258,56 +258,64 @@ impl TcpEchoClient { fn handle_pop(&mut self, qr: &demi_qresult_t) -> Result<()> { let qd: QDesc = qr.qr_qd.into(); let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - if sga.sga_segs[0].sgaseg_len == 0 { - println!("INFO: server closed connection"); - self.handle_close(qd)?; - } else { - // Retrieve client buffer. - let (recvbuf, index): &mut (Vec, usize) = self - .clients - .get_mut(&qd) - .ok_or(anyhow::anyhow!("unregistered socket"))?; - - // Copy data. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; - let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; - recvbuf[*index..(*index + len)].copy_from_slice(slice); - *index += len; - - // TODO: Sanity check packet. - - // Check if there are more bytes to read from this packet. - if *index < recvbuf.capacity() { - // Free scatter-gather-array. - self.libos.sgafree(sga)?; + // Retrieve current index into client buffer. + let mut index = self.clients.get(&qd).ok_or(anyhow::anyhow!("unregistered socket"))?.1; + + for i in 0..sga.num_segments as usize { + let seg = sga.segments[i]; + if seg.data_len_bytes == 0 { + println!("INFO: server closed connection"); + return self.handle_close(qd); + } else { + // Copy data. + let ptr: *mut u8 = seg.data_buf_ptr as *mut u8; + let len: usize = seg.data_len_bytes as usize; + let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; + let recvbuf = &mut self + .clients + .get_mut(&qd) + .ok_or(anyhow::anyhow!("unregistered socket"))? + .0; + recvbuf[index..(index + len)].copy_from_slice(slice); + + index += len; self.nbytes += len; - // There are, thus issue a partial pop. - let size: usize = recvbuf.capacity() - *index; - self.issue_pop(qd, Some(size))?; + // TODO: Sanity check packet. + + // If received a whole message. + if index == self.bufsize { + // Read timestamp from recvbuf. + let timestamp: u64 = u64::from_le_bytes([ + recvbuf[0], recvbuf[1], recvbuf[2], recvbuf[3], recvbuf[4], recvbuf[5], recvbuf[6], recvbuf[7], + ]); + let now: u64 = Instant::now().duration_since(self.start).as_nanos() as u64; + let elapsed: u64 = now - timestamp; + self.stats.increment(elapsed)?; + + index = 0; + self.nechoed += 1; + self.issue_push(qd)?; + } } - // Push another packet. - else { - // Read timestamp from recvbuf. - let timestamp: u64 = u64::from_le_bytes([ - recvbuf[0], recvbuf[1], recvbuf[2], recvbuf[3], recvbuf[4], recvbuf[5], recvbuf[6], recvbuf[7], - ]); - let now: u64 = Instant::now().duration_since(self.start).as_nanos() as u64; - let elapsed: u64 = now - timestamp; - self.stats.increment(elapsed)?; - - // Free scatter-gather-array. - self.libos.sgafree(sga)?; - self.nbytes += len; + } - // There aren't, so push another packet. - *index = 0; - self.nechoed += 1; - self.issue_push(qd)?; - } + self.libos.sgafree(sga)?; + + // Check if there are more bytes to read from this packet. + if index != 0 { + // There are, thus issue a partial pop. + let size: usize = self.bufsize - index; + let client_index = &mut self + .clients + .get_mut(&qd) + .ok_or(anyhow::anyhow!("unregistered socket"))? + .1; + *client_index = index; + self.issue_pop(qd, Some(size))?; } + Ok(()) } @@ -317,7 +325,7 @@ impl TcpEchoClient { self.npushed += 1; // Pop another packet. - self.issue_pop(qd, None)?; + self.issue_pop(qd, Some(self.bufsize))?; Ok(()) } diff --git a/examples/tcp-echo/server.rs b/examples/tcp-echo/server.rs index 6efe23c8c..987e4555d 100644 --- a/examples/tcp-echo/server.rs +++ b/examples/tcp-echo/server.rs @@ -179,17 +179,25 @@ impl TcpEchoServer { fn handle_pop(&mut self, qr: &demi_qresult_t) -> Result<()> { let qd: QDesc = qr.qr_qd.into(); - let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; + let mut sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - // Check if we received any data. - if sga.sga_segs[0].sgaseg_len == 0 { + // Check if we received any data. If last sga element is 0 length, treat as EoF. + let closing = if sga.segments[sga.num_segments as usize - 1].data_len_bytes == 0 { println!("INFO: client closed connection (qd={:?})", qd); - self.handle_close(qd)?; + sga.num_segments -= 1; + true } else { + false + }; + if sga.num_segments > 0 { self.issue_push(qd, &sga)?; + } + if closing { + self.handle_close(qd)?; + } else { // Pop more data. self.issue_pop(qd)?; - } + }; self.libos.sgafree(sga)?; Ok(()) diff --git a/examples/tcp-wait/client.rs b/examples/tcp-wait/client.rs index 2aa96531b..2d7ae6f01 100644 --- a/examples/tcp-wait/client.rs +++ b/examples/tcp-wait/client.rs @@ -104,9 +104,9 @@ impl TcpClient { match self.libos.wait(pop_qt, Some(TIMEOUT_SECONDS)) { Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_POP && qr.qr_ret == 0 => { let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - let sgaseg_len: u32 = sga.sga_segs[0].sgaseg_len; + let data_len_bytes: u32 = sga.segments[0].data_len_bytes; self.libos.sgafree(sga)?; - if sgaseg_len == 0 { + if data_len_bytes == 0 { // In this testing program, the server does not terminate the connection before the client does. // Therefore, pop() cannot successfully receive a zero-length scatter-gather array. anyhow::bail!("pop() should not sucessfully terminate"); @@ -169,9 +169,9 @@ impl TcpClient { match self.libos.wait(pop_qt, Some(TIMEOUT_SECONDS)) { Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_POP && qr.qr_ret == 0 => { let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - let sgaseg_len: u32 = sga.sga_segs[0].sgaseg_len; + let data_len_bytes: u32 = sga.segments[0].data_len_bytes; self.libos.sgafree(sga)?; - if sgaseg_len == 0 { + if data_len_bytes == 0 { // In this testing program, the server does not terminate the connection before the client does. // Therefore, pop() cannot successfully receive a zero-length scatter-gather array. anyhow::bail!("pop() should not sucessfully terminate"); @@ -230,9 +230,9 @@ impl TcpClient { match self.libos.wait(pop_qt, Some(TIMEOUT_SECONDS)) { Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_POP && qr.qr_ret == 0 => { let sga: demi_sgarray_t = unsafe { qr.qr_value.sga }; - let sgaseg_len: u32 = sga.sga_segs[0].sgaseg_len; + let data_len_bytes: u32 = sga.segments[0].data_len_bytes; self.libos.sgafree(sga)?; - if sgaseg_len == 0 { + if data_len_bytes == 0 { // In this testing program, the server does not terminate the connection before the client does. // Therefore, pop() cannot successfully receive a zero-length scatter-gather array. anyhow::bail!("pop() should not sucessfully terminate"); @@ -267,11 +267,11 @@ impl TcpClient { }; // Ensure that scatter-gather array has the requested size. - assert!(sga.sga_segs[0].sgaseg_len as usize == size); + assert!(sga.segments[0].data_len_bytes as usize == size); // Fill in scatter-gather array. - let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8; - let len: usize = sga.sga_segs[0].sgaseg_len as usize; + let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8; + let len: usize = sga.segments[0].data_len_bytes as usize; let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) }; slice.fill(value); diff --git a/include/demi/types.h b/include/demi/types.h index 86d1f76d2..db0558ec6 100644 --- a/include/demi/types.h +++ b/include/demi/types.h @@ -29,7 +29,7 @@ extern "C" /** * @brief Maximum number of segments in a scatter-gather array. */ -#define DEMI_SGARRAY_MAXSIZE 1 +#define DEMI_SGARRAY_MAXSIZE 20 /** * @brief An I/O queue token. @@ -47,8 +47,9 @@ extern "C" typedef struct __attribute__((__packed__)) demi_sgaseg #endif { - void *sgaseg_buf; /**< Underlying data. */ - uint32_t sgaseg_len; /**< Size in bytes of data. */ + void *sgaseg_md; /**< Reserved for Demikernel metadata. */ + void *data_buf_ptr; /**< Underlying data. */ + uint32_t data_len_bytes; /**< Size in bytes of data. */ } demi_sgaseg_t; #ifdef _WIN32 #pragma pack(pop) @@ -65,9 +66,8 @@ extern "C" typedef struct __attribute__((__packed__)) demi_sgarray #endif { - void *sga_buf; /**< Reserved. */ uint32_t sga_numsegs; /**< Number of segments in the scatter-gather array. */ - demi_sgaseg_t sga_segs[DEMI_SGARRAY_MAXSIZE]; /**< Scatter-gather array segments. */ + demi_sgaseg_t segments[DEMI_SGARRAY_MAXSIZE]; /**< Scatter-gather array segments. */ struct sockaddr_in sga_addr; /**< Source address of scatter-gather array. */ } demi_sgarray_t; #ifdef _WIN32 @@ -148,8 +148,9 @@ extern "C" DemiLogLevel_Trace = 5, } demi_log_level_t; - // Logging callback. Arguments are: level, module name, module length, file name, file name length, line number, message, message length, - typedef void (*demi_log_callback_t)(demi_log_level_t, const char*, uint32_t, const char*, uint32_t, uint32_t, const char*, uint32_t); + // Logging callback. Demikernel will call an external C function on each logged stat for inclusion into a + // performance tracking system. + typedef void (*demi_log_callback_t)(demi_log_level_t log_level, const char *module_name, uint32_t module_name_len_bytes, const char *file_name, uint32_t file_name_len_bytes, uint32_t line_number, const char *message, uint32_t message_len_bytes); /** * @brief Arguments for Demikernel. diff --git a/man/demi_sgaalloc.md b/man/demi_sgaalloc.md index 52836340b..e655ce698 100644 --- a/man/demi_sgaalloc.md +++ b/man/demi_sgaalloc.md @@ -30,7 +30,7 @@ typedef struct demi_sgarray // Number of segments in the scatter-gather array. uint32_t sga_numsegs; // Scatter-gather array segments. - demi_sgaseg_t sga_segs[DEMI_SGARRAY_MAXSIZE]; + demi_sgaseg_t segments[DEMI_SGARRAY_MAXSIZE]; // Source address of scatter-gather array. struct sockaddr_in sga_addr; } demi_sgarray_t; @@ -42,9 +42,9 @@ The `demi_sgaseg_t` is defined as follows: typedef struct demi_sgaseg { // Underlying data. - void *sgaseg_buf; + void *data_buf_ptr; // Size in bytes of data. - uint32_t sgaseg_len; + uint32_t data_len_bytes; } demi_sgaseg_t; ``` diff --git a/network_simulator/input/tcp/pop/pop-reordering.pkt b/network_simulator/input/tcp/pop/pop-reordering.pkt index e10cfe6b1..83126db20 100644 --- a/network_simulator/input/tcp/pop/pop-reordering.pkt +++ b/network_simulator/input/tcp/pop/pop-reordering.pkt @@ -17,7 +17,7 @@ +.0 wait(500, ...) = 0 // Read data. -+.1 read(501, ..., 1000) = 1000 ++.1 read(501, ..., 2000) = 2000 // Receive out of order data packet. +.1 TCP < P. seq 11001(1000) ack 12346 win 65535 @@ -32,10 +32,4 @@ +.0 wait(501, ...) = 0 // Send ACK packet -+.6 TCP > . seq 12346(0) ack 12001 win 63535 - -// Read data. -+.1 read(501, ..., 1000) = 1000 - -// Data read. -+.0 wait(501, ...) = 0 ++.6 TCP > . seq 12346(0) ack 12001 win 63535 \ No newline at end of file diff --git a/src/catnap/linux/active_socket.rs b/src/catnap/linux/active_socket.rs index 4a949007a..50870dc0a 100644 --- a/src/catnap/linux/active_socket.rs +++ b/src/catnap/linux/active_socket.rs @@ -12,7 +12,7 @@ use crate::{ runtime::{fail::Fail, limits, memory::DemiBuffer, DemiRuntime}, }; use ::socket2::Socket; -use ::std::{cmp::min, mem::MaybeUninit, net::SocketAddr, slice}; +use ::std::{cmp::min, mem::MaybeUninit, net::SocketAddr, slice, time::Duration}; //====================================================================================================================== // Structures @@ -153,20 +153,29 @@ impl ActiveSocketData { } } - /// Pops data from the socket. Blocks until some data is found but does not wait until the buffer has reached [size]. - pub async fn pop(&mut self, size: usize) -> Result<(Option, DemiBuffer), Fail> { - let (addr, mut buffer) = self.recv_queue.pop(None).await??; + /// Pops data from the socket. Blocks until some data is found but does not wait until the buf has reached [size]. + pub async fn pop( + &mut self, + size: usize, + timeout: Option, + ) -> Result<(Option, DemiBuffer), Fail> { + let (addr, mut buffer) = self.recv_queue.pop(timeout).await??; // Figure out how much data we got. let bytes_read = min(buffer.len(), size); // Trim the buffer and leave for next read if we got more than expected. if let Ok(remainder) = buffer.split_back(bytes_read) { if !remainder.is_empty() { - self.recv_queue.push_front(Ok((addr, remainder))); + self.push_front(remainder, addr.clone()); } } Ok((addr, buffer)) } + /// Puts data back into the socket queue. + pub fn push_front(&mut self, buf: DemiBuffer, addr: Option) { + self.recv_queue.push_front(Ok((addr, buf))); + } + pub fn get_socket(&self) -> &Socket { &self.socket } diff --git a/src/catnap/linux/socket.rs b/src/catnap/linux/socket.rs index c76201464..004a82688 100644 --- a/src/catnap/linux/socket.rs +++ b/src/catnap/linux/socket.rs @@ -15,6 +15,7 @@ use ::std::{ net::SocketAddr, ops::{Deref, DerefMut}, os::fd::{AsRawFd, RawFd}, + time::Duration, }; //====================================================================================================================== @@ -115,10 +116,23 @@ impl SharedSocketData { } /// Pop some data on an active established connection. - pub async fn pop(&mut self, size: usize) -> Result<(Option, DemiBuffer), Fail> { + pub async fn pop( + &mut self, + size: usize, + timeout: Option, + ) -> Result<(Option, DemiBuffer), Fail> { match self.deref_mut() { SocketData::Inactive(_) => unreachable!("Cannot read on an inactive socket"), - SocketData::Active(data) => data.pop(size).await, + SocketData::Active(data) => data.pop(size, timeout).await, + SocketData::Passive(_) => unreachable!("Cannot read on a passive socket"), + } + } + + /// Put some data back into an active socket. + pub fn push_front(&mut self, buf: DemiBuffer, addr: Option) { + match self.deref_mut() { + SocketData::Inactive(_) => unreachable!("Cannot read on an inactive socket"), + SocketData::Active(data) => data.push_front(buf, addr), SocketData::Passive(_) => unreachable!("Cannot read on a passive socket"), } } diff --git a/src/catnap/linux/transport.rs b/src/catnap/linux/transport.rs index 795224534..e2344d6ae 100644 --- a/src/catnap/linux/transport.rs +++ b/src/catnap/linux/transport.rs @@ -16,7 +16,7 @@ mod socket; use crate::{ catnap::transport::socket::{SharedSocketData, SocketData}, demikernel::config::Config, - expect_ok, expect_some, + expect_some, runtime::{ fail::Fail, memory::{DemiBuffer, DemiMemoryAllocator}, @@ -24,9 +24,12 @@ use crate::{ socket::option::{SocketOption, TcpSocketOptions}, transport::NetworkTransport, }, - poll_yield, DemiRuntime, SharedDemiRuntime, SharedObject, + poll_yield, + types::DEMI_SGARRAY_MAXLEN, + DemiRuntime, SharedDemiRuntime, SharedObject, }, }; +use ::arrayvec::ArrayVec; use ::futures::FutureExt; use ::slab::Slab; use ::socket2::{Domain, Protocol, Socket, Type}; @@ -35,6 +38,7 @@ use ::std::{ net::{Shutdown, SocketAddr, SocketAddrV4}, ops::{Deref, DerefMut}, os::fd::{AsRawFd, RawFd}, + time::Duration, }; //====================================================================================================================== @@ -530,7 +534,7 @@ impl NetworkTransport for SharedCatnapTransport { libc::ENOTCONN => break, errno if DemiRuntime::should_retry(errno) => { // Wait for a new incoming event. - data.pop(0).await?; + data.pop(0, None).await?; continue; }, errno => return Err(Fail::new(errno, "operation failed")), @@ -553,12 +557,12 @@ impl NetworkTransport for SharedCatnapTransport { async fn push( &mut self, sd: &mut Self::SocketDescriptor, - buffer: &mut DemiBuffer, + bufs: ArrayVec, addr: Option, ) -> Result<(), Fail> { - self.data_from_sd(sd).push(addr, buffer.clone()).await?; - // Clear out the original buffer. - expect_ok!(buffer.trim(buffer.len()), "Should be able to empty the buffer"); + for buf in bufs { + self.data_from_sd(sd).push(addr, buf).await?; + } Ok(()) } @@ -569,8 +573,32 @@ impl NetworkTransport for SharedCatnapTransport { &mut self, sd: &mut Self::SocketDescriptor, size: usize, - ) -> Result<(Option, DemiBuffer), Fail> { - self.data_from_sd(sd).pop(size).await + ) -> Result<(Option, ArrayVec), Fail> { + let mut total_size = 0; + let mut bufs: ArrayVec = ArrayVec::new(); + let mut addr: Option = None; + while total_size < size && bufs.len() < DEMI_SGARRAY_MAXLEN - 1 { + let (src_addr, buf) = if !bufs.is_empty() { + match self.data_from_sd(sd).pop(size - total_size, Some(Duration::ZERO)).await { + Ok(result) => result, + Err(e) if e.errno == libc::ETIMEDOUT => break, + Err(e) => return Err(e), + } + } else { + self.data_from_sd(sd).pop(size - total_size, None).await? + }; + match addr { + None => addr = src_addr, + addr if src_addr != addr => { + self.data_from_sd(sd).push_front(buf, src_addr); + return Ok((addr, bufs)); + }, + _ => (), + } + total_size += buf.len(); + bufs.push(buf) + } + Ok((addr, bufs)) } /// Close the socket on the underlying transport. Also unregisters the socket with epoll. diff --git a/src/catnap/win/transport.rs b/src/catnap/win/transport.rs index 703ffea48..729585851 100644 --- a/src/catnap/win/transport.rs +++ b/src/catnap/win/transport.rs @@ -25,11 +25,14 @@ use crate::{ socket::option::{SocketOption, TcpSocketOptions}, transport::NetworkTransport, }, - poll_yield, DemiRuntime, SharedDemiRuntime, SharedObject, + poll_yield, + types::DEMI_SGARRAY_MAXLEN, + DemiRuntime, SharedDemiRuntime, SharedObject, }, }; +use ::arrayvec::ArrayVec; use ::futures::FutureExt; -use std::{ +use ::std::{ net::{SocketAddr, SocketAddrV4}, pin::Pin, }; @@ -244,7 +247,7 @@ impl NetworkTransport for SharedCatnapTransport { &mut self, socket: &mut Self::SocketDescriptor, size: usize, - ) -> Result<(Option, DemiBuffer), Fail> { + ) -> Result<(Option, ArrayVec), Fail> { let mut buffer = DemiBuffer::new(size as u16); unsafe { self.0.iocp.do_io( @@ -265,7 +268,9 @@ impl NetworkTransport for SharedCatnapTransport { } else { trace!("not data received"); } - Ok((sockaddr, buffer)) + let mut result = ArrayVec::new(); + result.push(buffer); + Ok((sockaddr, result)) }) } @@ -275,40 +280,40 @@ impl NetworkTransport for SharedCatnapTransport { async fn push( &mut self, socket: &mut Self::SocketDescriptor, - buf: &mut DemiBuffer, + bufs: ArrayVec, addr: Option, ) -> Result<(), Fail> { - loop { - let result = unsafe { - self.0.iocp.do_io( - SocketOpState::Push(buf.clone()), - |state: Pin<&mut SocketOpState>, overlapped: *mut OVERLAPPED| -> Result<(), Fail> { - socket.start_push(state, addr, overlapped) - }, - |_: Pin<&mut SocketOpState>, result: OverlappedResult| -> Result { - socket.finish_push(result) + for mut buf in bufs { + while !buf.is_empty() { + let result: Result = unsafe { + self.0.iocp.do_io( + SocketOpState::Push(buf.clone()), + |state: Pin<&mut SocketOpState>, overlapped: *mut OVERLAPPED| -> Result<(), Fail> { + socket.start_push(state, addr, overlapped) + }, + |_: Pin<&mut SocketOpState>, result: OverlappedResult| -> Result { + socket.finish_push(result) + }, + ) + } + .await; + + match result { + Ok(nbytes) => { + trace!("data pushed ({:?}/{:?} bytes)", nbytes, buf.len()); + buf.adjust(nbytes)?; }, - ) - } - .await; - - match result { - Ok(nbytes) => { - trace!("data pushed ({:?}/{:?} bytes)", nbytes, buf.len()); - buf.adjust(nbytes)?; - if buf.is_empty() { - return Ok(()); - } - }, - Err(fail) => { - if !DemiRuntime::should_retry(fail.errno) { - let msg = format!("push failed: {}", fail.cause); - return Err(Fail::new(fail.errno, msg.as_str())); - } - }, + Err(fail) => { + if !DemiRuntime::should_retry(fail.errno) { + let message: String = format!("push failed: {}", fail.cause); + return Err(Fail::new(fail.errno, message.as_str())); + } + }, + } } } + Ok(()) } fn get_runtime(&self) -> &SharedDemiRuntime { diff --git a/src/catnip/runtime/mod.rs b/src/catnip/runtime/mod.rs index b5e8c0d78..7cac387f0 100644 --- a/src/catnip/runtime/mod.rs +++ b/src/catnip/runtime/mod.rs @@ -18,11 +18,7 @@ use crate::{ mempool::MemoryPool, }, demikernel::config::Config, - expect_some, - inetstack::{ - consts::{MAX_HEADER_SIZE, RECEIVE_BATCH_SIZE}, - protocols::layer1::PhysicalLayer, - }, + inetstack::{consts::MAX_BATCH_SIZE_NUM_PACKETS, protocols::layer1::PhysicalLayer}, runtime::{ fail::Fail, libdpdk::{ @@ -320,43 +316,48 @@ impl DerefMut for SharedDPDKRuntime { } impl PhysicalLayer for SharedDPDKRuntime { - fn transmit(&mut self, pkt: DemiBuffer) -> Result<(), Fail> { + fn transmit(&mut self, pkts: ArrayVec) -> Result<(), Fail> { timer!("catnip::runtime::transmit"); + // Grab the packet and copy it if necessary. In general, this copy will happen for small packets without // payloads because we allocate actual data-carrying application buffers from the DPDK pool. - let outgoing_pkt: DemiBuffer = match pkt { - buf if buf.is_dpdk_allocated() => buf, - buf if buf.len() <= self.max_body_size => { - let mut mbuf: DemiBuffer = self.dpdk_allocate_mbuf(buf.len())?; - debug_assert_eq!(buf.len(), mbuf.len()); - mbuf.copy_from_slice(&buf); - - mbuf - }, - buf => { - let cause: String = format!( - "Cannot allocate a DPDK buffer that is large enough. Max size={:?} request size={:?}", - self.max_body_size, - buf.len() - ); - warn!("{}", cause); - return Err(Fail::new(libc::EINVAL, &cause)); - }, - }; + let len: usize = pkts.len(); + let mut mbufs: [*mut rte_mbuf; MAX_BATCH_SIZE_NUM_PACKETS] = unsafe { mem::zeroed() }; + for (i, pkt) in pkts.into_iter().enumerate() { + mbufs[i] = match pkt { + buf if buf.is_dpdk_allocated() => buf + .into_mbuf() + .ok_or(Fail::new(libc::EINVAL, "should be able to convert into mbuf"))?, + buf if buf.len() <= self.max_body_size => { + let mut mbuf: DemiBuffer = self.dpdk_allocate_mbuf(buf.len())?; + debug_assert_eq!(buf.len(), mbuf.len()); + mbuf.copy_from_slice(&buf); + + mbuf.into_mbuf() + .ok_or(Fail::new(libc::EINVAL, "should be able to convert into mbuf"))? + }, + _ => { + return Err(Fail::new( + libc::EINVAL, + "cannot allocate DPDK buffer that is big enough", + )) + }, + }; + } - let mut mbuf_ptr: *mut rte_mbuf = expect_some!(outgoing_pkt.into_mbuf(), "mbuf cannot be empty"); - let num_sent: u16 = unsafe { rte_eth_tx_burst(self.port_id, 0, &mut mbuf_ptr, 1) }; + let num_sent: u16 = unsafe { rte_eth_tx_burst(self.port_id, 0, mbufs.as_mut_ptr(), len as u16) }; debug_assert_eq!(num_sent, 1); Ok(()) } - fn receive(&mut self) -> Result, Fail> { + fn receive(&mut self) -> Result, Fail> { timer!("catnip::runtime::receive"); let mut out = ArrayVec::new(); - let mut packets: [*mut rte_mbuf; RECEIVE_BATCH_SIZE] = unsafe { mem::zeroed() }; - let nb_rx = unsafe { rte_eth_rx_burst(self.port_id, 0, packets.as_mut_ptr(), RECEIVE_BATCH_SIZE as u16) }; - assert!(nb_rx as usize <= RECEIVE_BATCH_SIZE); + let mut packets: [*mut rte_mbuf; MAX_BATCH_SIZE_NUM_PACKETS] = unsafe { mem::zeroed() }; + let nb_rx = + unsafe { rte_eth_rx_burst(self.port_id, 0, packets.as_mut_ptr(), MAX_BATCH_SIZE_NUM_PACKETS as u16) }; + assert!(nb_rx as usize <= MAX_BATCH_SIZE_NUM_PACKETS); { for &packet in &packets[..nb_rx as usize] { @@ -371,13 +372,12 @@ impl PhysicalLayer for SharedDPDKRuntime { } impl DemiMemoryAllocator for SharedDPDKRuntime { + fn get_max_buffer_size_bytes(&self) -> usize { + self.max_body_size + } + fn allocate_demi_buffer(&self, size: usize) -> Result { - // First allocate the underlying DemiBuffer. - if size <= self.max_body_size { - self.dpdk_allocate_mbuf(size) - } else { - // Allocate a heap-managed buffer. - Ok(DemiBuffer::new_with_headroom(size as u16, MAX_HEADER_SIZE as u16)) - } + debug_assert!(size < self.max_body_size); + self.dpdk_allocate_mbuf(size) } } diff --git a/src/catpowder/linux/mod.rs b/src/catpowder/linux/mod.rs index 5b621ef18..cf38a177a 100644 --- a/src/catpowder/linux/mod.rs +++ b/src/catpowder/linux/mod.rs @@ -11,7 +11,7 @@ use crate::{ catpowder::linux::rawsocket::{RawSocket, RawSocketAddr}, demikernel::config::Config, expect_ok, - inetstack::consts::{MAX_HEADER_SIZE, RECEIVE_BATCH_SIZE}, + inetstack::consts::{MAX_BATCH_SIZE_NUM_PACKETS, MAX_HEADER_SIZE}, inetstack::protocols::{layer1::PhysicalLayer, layer2::Ethernet2Header}, runtime::{ fail::Fail, @@ -35,6 +35,7 @@ use ::std::{ pub struct LinuxRuntime { ifindex: i32, socket: SharedObject, + max_body_size: usize, } //====================================================================================================================== @@ -52,9 +53,12 @@ impl LinuxRuntime { let sockaddr: RawSocketAddr = RawSocketAddr::new(ifindex, &mac_addr); socket.bind(&sockaddr)?; + let max_body_size = config.mtu()? as usize - MAX_HEADER_SIZE; + Ok(Self { ifindex, socket: SharedObject::::new(socket), + max_body_size, }) } @@ -71,6 +75,10 @@ impl LinuxRuntime { //====================================================================================================================== impl DemiMemoryAllocator for LinuxRuntime { + fn get_max_buffer_size_bytes(&self) -> usize { + self.max_body_size + } + fn allocate_demi_buffer(&self, size: usize) -> Result { Ok(DemiBuffer::new_with_headroom(size as u16, MAX_HEADER_SIZE as u16)) } @@ -79,33 +87,36 @@ impl DemiMemoryAllocator for LinuxRuntime { impl Runtime for LinuxRuntime {} impl PhysicalLayer for LinuxRuntime { - fn transmit(&mut self, pkt: DemiBuffer) -> Result<(), Fail> { - // We clone the packet so as to not remove the ethernet header from the outgoing message. - let header = Ethernet2Header::parse_and_strip(&mut pkt.clone()).unwrap(); - let dest_addr_arr: [u8; 6] = header.dst_addr().to_array(); - let dest_sockaddr: RawSocketAddr = RawSocketAddr::new(self.ifindex, &dest_addr_arr); - - match self.socket.sendto(&pkt, &dest_sockaddr) { - Ok(size) if size == pkt.len() => Ok(()), - Ok(size) => { - let cause = format!( - "Incorrect number of bytes sent: packet_size={:?} sent={:?}", - pkt.len(), - size - ); - warn!("{}", cause); - Err(Fail::new(libc::EAGAIN, &cause)) - }, - Err(e) => { - let cause = "send failed"; - warn!("transmit(): {} {:?}", cause, e); - Err(Fail::new(libc::EIO, cause)) - }, + fn transmit(&mut self, pkts: ArrayVec) -> Result<(), Fail> { + for pkt in pkts { + // We clone the packet so as to not remove the ethernet header from the outgoing message. + let header = Ethernet2Header::parse_and_strip(&mut pkt.clone()).unwrap(); + let dest_addr_arr: [u8; 6] = header.dst_addr().to_array(); + let dest_sockaddr: RawSocketAddr = RawSocketAddr::new(self.ifindex, &dest_addr_arr); + + match self.socket.sendto(&pkt, &dest_sockaddr) { + Ok(size) if size == pkt.len() => (), + Ok(size) => { + let cause = format!( + "Incorrect number of bytes sent: packet_size={:?} sent={:?}", + pkt.len(), + size + ); + warn!("{}", cause); + return Err(Fail::new(libc::EAGAIN, &cause)); + }, + Err(e) => { + let cause = "send failed"; + warn!("transmit(): {} {:?}", cause, e); + return Err(Fail::new(libc::EIO, &cause)); + }, + } } + Ok(()) } // TODO: This routine currently only tries to receive a single packet buffer, not a batch of them. - fn receive(&mut self) -> Result, Fail> { + fn receive(&mut self) -> Result, Fail> { // TODO: This routine contains an extra copy of the entire incoming packet that could potentially be removed. // TODO: change this function to operate directly on DemiBuffer rather than on MaybeUninit. @@ -114,7 +125,7 @@ impl PhysicalLayer for LinuxRuntime { let mut out: [MaybeUninit; limits::RECVBUF_SIZE_MAX] = [unsafe { MaybeUninit::uninit().assume_init() }; limits::RECVBUF_SIZE_MAX]; if let Ok((nbytes, _origin_addr)) = self.socket.recvfrom(&mut out[..]) { - let mut ret: ArrayVec = ArrayVec::new(); + let mut ret: ArrayVec = ArrayVec::new(); unsafe { let bytes: [u8; limits::RECVBUF_SIZE_MAX] = mem::transmute::<[MaybeUninit; limits::RECVBUF_SIZE_MAX], [u8; limits::RECVBUF_SIZE_MAX]>(out); diff --git a/src/catpowder/win/runtime.rs b/src/catpowder/win/runtime.rs index dc8fc2f09..527e34b14 100644 --- a/src/catpowder/win/runtime.rs +++ b/src/catpowder/win/runtime.rs @@ -16,7 +16,7 @@ use crate::{ }, demikernel::config::Config, inetstack::{ - consts::{MAX_HEADER_SIZE, RECEIVE_BATCH_SIZE}, + consts::{MAX_BATCH_SIZE_NUM_PACKETS, MAX_HEADER_SIZE}, protocols::{layer1::PhysicalLayer, layer4::ephemeral::EphemeralPorts}, }, runtime::{ @@ -56,6 +56,9 @@ struct CatpowderRuntime { /// Statistics for the runtime. stats: CatpowderStats, + + /// Max body size, usually the MTU. + max_body_size: usize, } //====================================================================================================================== @@ -85,6 +88,8 @@ impl SharedCatpowderRuntime { let stats: CatpowderStats = CatpowderStats::new(&interface, vf_interface.as_ref())?; let always_send_on_vf: bool = config.xdp_always_send_on_vf()? && vf_interface.is_some(); + let max_body_size: usize = config.mss()? as usize - MAX_HEADER_SIZE; + Ok(Self(SharedObject::new(CatpowderRuntime { api, interface, @@ -92,6 +97,7 @@ impl SharedCatpowderRuntime { always_send_on_vf, cohosting_mode, stats, + max_body_size, }))) } @@ -114,38 +120,40 @@ impl SharedCatpowderRuntime { impl PhysicalLayer for SharedCatpowderRuntime { /// Transmits a packet. - fn transmit(&mut self, pkt: DemiBuffer) -> Result<(), Fail> { + fn transmit(&mut self, pkts: ArrayVec) -> Result<(), Fail> { timer!("catpowder::win::runtime::transmit"); - let pkt_size: usize = pkt.len(); - if pkt_size >= u16::MAX as usize { - let cause = format!("packet is too large: {:?}", pkt_size); - warn!("{}", cause); - return Err(Fail::new(libc::ENOTSUP, &cause)); - } + for pkt in pkts { + let pkt_size: usize = pkt.len(); + if pkt_size >= u16::MAX as usize { + let cause = format!("packet is too large: {:?}", pkt_size); + warn!("{}", cause); + return Err(Fail::new(libc::ENOTSUP, &cause)); + } - let me: &mut CatpowderRuntime = &mut self.0.borrow_mut(); - me.interface.return_tx_buffers(); + let me: &mut CatpowderRuntime = &mut self.0.borrow_mut(); + me.interface.return_tx_buffers(); - if let Some(vf_interface) = me.vf_interface.as_mut() { - vf_interface.return_tx_buffers(); + if let Some(vf_interface) = me.vf_interface.as_mut() { + vf_interface.return_tx_buffers(); - if me.always_send_on_vf { - vf_interface.tx_ring.transmit_buffer(&mut me.api, pkt)?; - return Ok(()); + if me.always_send_on_vf { + vf_interface.tx_ring.transmit_buffer(&mut me.api, pkt)?; + return Ok(()); + } } - } - me.interface.tx_ring.transmit_buffer(&mut me.api, pkt)?; + me.interface.tx_ring.transmit_buffer(&mut me.api, pkt)?; + } Ok(()) } /// Polls for received packets. - fn receive(&mut self) -> Result, Fail> { + fn receive(&mut self) -> Result, Fail> { timer!("catpowder::win::runtime::receive"); self.0.stats.update_poll_time(); - let mut ret: ArrayVec = ArrayVec::new(); + let mut ret: ArrayVec = ArrayVec::new(); let me: &mut CatpowderRuntime = &mut self.0.borrow_mut(); me.interface.provide_rx_buffers(); @@ -187,6 +195,10 @@ impl PhysicalLayer for SharedCatpowderRuntime { /// Memory runtime trait implementation for XDP Runtime. impl DemiMemoryAllocator for SharedCatpowderRuntime { + fn get_max_buffer_size_bytes(&self) -> usize { + self.0.max_body_size + } + /// Allocates a scatter-gather array. fn allocate_demi_buffer(&self, size: usize) -> Result { timer!("catpowder::win::runtime::sgaalloc"); diff --git a/src/demikernel/bindings.rs b/src/demikernel/bindings.rs index b11697901..bd1a1bb69 100644 --- a/src/demikernel/bindings.rs +++ b/src/demikernel/bindings.rs @@ -14,7 +14,7 @@ use crate::{ runtime::{ fail::Fail, logging::{self, CallbackLogWriter}, - types::{demi_args_t, demi_qresult_t, demi_qtoken_t, demi_sgarray_t, demi_sgaseg_t}, + types::{demi_args_t, demi_qresult_t, demi_qtoken_t, demi_sgarray_t}, QToken, }, SocketOption, @@ -599,17 +599,7 @@ pub unsafe extern "C" fn demi_wait_next_n( pub unsafe extern "C" fn demi_sgaalloc(size: libc::size_t) -> demi_sgarray_t { trace!("demi_sgaalloc()"); - let null_sga = { - demi_sgarray_t { - sga_buf: ptr::null_mut() as *mut _, - sga_numsegs: 0, - sga_segs: [demi_sgaseg_t { - sgaseg_buf: ptr::null_mut(), - sgaseg_len: 0, - }; 1], - sga_addr: unsafe { mem::zeroed() }, - } - }; + let null_sga = demi_sgarray_t::default(); let ret = do_syscall(|libos| -> demi_sgarray_t { match libos.sgaalloc(size) { diff --git a/src/demikernel/libos/network/libos.rs b/src/demikernel/libos/network/libos.rs index 60a39f387..138ef5e21 100644 --- a/src/demikernel/libos/network/libos.rs +++ b/src/demikernel/libos/network/libos.rs @@ -19,11 +19,14 @@ use crate::{ unwrap_socketaddr, }, queue::{downcast_queue, IoQueue, OperationResult}, - types::{demi_accept_result_t, demi_opcode_t, demi_qr_value_t, demi_qresult_t, demi_sgarray_t}, + types::{ + demi_accept_result_t, demi_opcode_t, demi_qr_value_t, demi_qresult_t, demi_sgarray_t, DEMI_SGARRAY_MAXLEN, + }, QDesc, QToken, SharedDemiRuntime, SharedObject, }, QType, }; +use ::arrayvec::ArrayVec; use ::futures::FutureExt; use ::socket2::{Domain, Protocol, Type}; use ::std::{ @@ -300,16 +303,23 @@ impl SharedNetworkLibOS { /// coroutine that asynchronously runs the push and any synchronous multi-queue functionality before the push /// begins. pub fn push(&mut self, qd: QDesc, sga: &demi_sgarray_t) -> Result { - let buf: DemiBuffer = clone_sgarray(sga)?; - if buf.is_empty() { - let cause: &'static str = "zero-length buffer"; + let bufs = clone_sgarray(sga)?; + if bufs.is_empty() { + let cause = "zero-length list of buffers"; warn!("push(): {}", cause); - return Err(Fail::new(libc::EINVAL, cause)); - }; + return Err(Fail::new(libc::EINVAL, &cause)); + } + for buf in bufs.iter() { + if buf.is_empty() { + let cause = "zero-length buffer"; + warn!("push(): {}", cause); + return Err(Fail::new(libc::EINVAL, &cause)); + }; + } let mut queue: SharedNetworkQueue = self.get_shared_queue(&qd)?; let coroutine_constructor = || -> Result { - let coroutine = Box::pin(self.clone().push_coroutine(qd, buf, None).fuse()); + let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, None).fuse()); self.runtime .clone() .insert_nonpolling_coroutine("ioc::network::libos::push", coroutine) @@ -321,7 +331,12 @@ impl SharedNetworkLibOS { /// Asynchronous code to push [buf] to a SharedNetworkQueue and its underlying POSIX socket. This function returns a /// coroutine that runs asynchronously to push a queue and its underlying POSIX socket and performs any necessary /// multi-queue operations at the LibOS-level after the push succeeds or fails. - async fn push_coroutine(self, qd: QDesc, buf: DemiBuffer, addr: Option) -> (QDesc, OperationResult) { + async fn push_coroutine( + self, + qd: QDesc, + bufs: ArrayVec, + addr: Option, + ) -> (QDesc, OperationResult) { // Grab the queue, make sure it hasn't been closed in the meantime. // This will bump the Rc refcount so the coroutine can have it's own reference to the shared queue data // structure and the SharedNetworkQueue will not be freed until this coroutine finishes. @@ -330,7 +345,7 @@ impl SharedNetworkLibOS { Err(e) => return (qd, OperationResult::Failed(e)), }; // Wait for push to complete. - match queue.push_coroutine(buf, addr).await { + match queue.push_coroutine(bufs, addr).await { Ok(()) => (qd, OperationResult::Push), Err(e) => { warn!("push() qd={:?}: {:?}", qd, &e); @@ -345,14 +360,14 @@ impl SharedNetworkLibOS { pub fn pushto(&mut self, qd: QDesc, sga: &demi_sgarray_t, remote: SocketAddr) -> Result { trace!("pushto() qd={:?}", qd); - let buf: DemiBuffer = clone_sgarray(sga)?; - if buf.is_empty() { - return Err(Fail::new(libc::EINVAL, "zero-length buffer")); + let bufs: ArrayVec = clone_sgarray(sga)?; + if bufs.is_empty() { + return Err(Fail::new(libc::EINVAL, "zero buffers to send")); } let mut queue: SharedNetworkQueue = self.get_shared_queue(&qd)?; let coroutine_constructor = || -> Result { - let coroutine = Box::pin(self.clone().push_coroutine(qd, buf, Some(remote)).fuse()); + let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, Some(remote)).fuse()); self.runtime .clone() .insert_nonpolling_coroutine("ioc::network::libos::pushto", coroutine) @@ -400,7 +415,7 @@ impl SharedNetworkLibOS { qd, OperationResult::Pop(Some(expect_ok!(unwrap_socketaddr(addr), "we only support IPv4")), buf), ), - Ok((None, buf)) => (qd, OperationResult::Pop(None, buf)), + Ok((None, bufs)) => (qd, OperationResult::Pop(None, bufs)), Err(e) => { warn!("pop() qd={:?}: {:?}", qd, &e); (qd, OperationResult::Failed(e)) @@ -477,7 +492,7 @@ impl SharedNetworkLibOS { OperationResult::Pop(addr, bytes) => match into_sgarray(bytes) { Ok(mut sga) => { if let Some(addr) = addr { - sga.sga_addr = socketaddrv4_to_sockaddr(&addr); + sga.sockaddr_src = socketaddrv4_to_sockaddr(&addr); } let qr_value: demi_qr_value_t = demi_qr_value_t { sga }; demi_qresult_t { diff --git a/src/demikernel/libos/network/queue.rs b/src/demikernel/libos/network/queue.rs index e7f36699d..e045f9f70 100644 --- a/src/demikernel/libos/network/queue.rs +++ b/src/demikernel/libos/network/queue.rs @@ -14,8 +14,10 @@ use crate::runtime::{ transport::NetworkTransport, }, queue::{IoQueue, QType}, + types::DEMI_SGARRAY_MAXLEN, QToken, SharedObject, }; +use ::arrayvec::ArrayVec; use ::futures::{pin_mut, select_biased, FutureExt}; use ::socket2::{Domain, Type}; use ::std::{ @@ -260,12 +262,18 @@ impl SharedNetworkQueue { /// Asynchronously push data to the queue. This function contains all of the single-queue, asynchronous code /// necessary to push to the queue and any single-queue functionality after the push completes. - pub async fn push_coroutine(&mut self, mut buf: DemiBuffer, addr: Option) -> Result<(), Fail> { + pub async fn push_coroutine( + &mut self, + bufs: ArrayVec, + addr: Option, + ) -> Result<(), Fail> { + self.state_machine.may_push()?; + let result = { let mut state_machine: SocketStateMachine = self.state_machine.clone(); let mut transport: T = self.transport.clone(); let state_tracker = state_machine.while_open().fuse(); - let operation = transport.push(&mut self.socket, &mut buf, addr).fuse(); + let operation = transport.push(&mut self.socket, bufs, addr).fuse(); pin_mut!(state_tracker); pin_mut!(operation); @@ -274,9 +282,6 @@ impl SharedNetworkQueue { result = operation => result, } }; - if result.is_ok() { - debug_assert_eq!(buf.len(), 0); - } result } @@ -293,7 +298,11 @@ impl SharedNetworkQueue { /// Asynchronously pops data from the queue. This function contains all of the single-queue, asynchronous code /// necessary to pop from a queue and any single-queue functionality after the pop completes. - pub async fn pop_coroutine(&mut self, size: Option) -> Result<(Option, DemiBuffer), Fail> { + pub async fn pop_coroutine( + &mut self, + size: Option, + ) -> Result<(Option, ArrayVec), Fail> { + self.state_machine.may_pop()?; let size: usize = size.unwrap_or(limits::RECVBUF_SIZE_MAX); let mut state_machine: SocketStateMachine = self.state_machine.clone(); diff --git a/src/inetstack/consts.rs b/src/inetstack/consts.rs index 47ef81c1a..e702bccdb 100644 --- a/src/inetstack/consts.rs +++ b/src/inetstack/consts.rs @@ -5,7 +5,7 @@ // Imports //====================================================================================================================== -use crate::inetstack::protocols::*; +use crate::{inetstack::protocols::*, runtime::types::DEMI_SGARRAY_MAXLEN}; use ::std::time::Duration; //====================================================================================================================== @@ -37,10 +37,10 @@ pub const TCP_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(3); /// TODO: Auto-Discovery MTU Size pub const DEFAULT_MSS: usize = 1450; -/// Length of a [crate::memory::DemiBuffer] batch. -/// -/// TODO: This Should be Generic -pub const RECEIVE_BATCH_SIZE: usize = 4; +/// Max batch size of packets for both transmit and receive up and down the stack. This is based on the +/// DEMI_SGARRAY_MAXLEN and should always be bigger than that to receiving an entire sga worth of packets at once. +pub const MAX_BATCH_SIZE_NUM_PACKETS: usize = 20; +const _: () = debug_assert!(DEMI_SGARRAY_MAXLEN <= MAX_BATCH_SIZE_NUM_PACKETS); /// Maximum local and remote window scaling factor. /// See: RFC 1323, Section 2.3. diff --git a/src/inetstack/mod.rs b/src/inetstack/mod.rs index b0dcab6b4..0110d3ed1 100644 --- a/src/inetstack/mod.rs +++ b/src/inetstack/mod.rs @@ -9,23 +9,25 @@ use crate::inetstack::types::MacAddress; use crate::{ demikernel::config::Config, - inetstack::{ - consts::MAX_RECV_ITERS, - protocols::layer4::{Peer, Socket}, - }, runtime::{ fail::Fail, memory::{DemiBuffer, DemiMemoryAllocator}, network::{socket::option::SocketOption, transport::NetworkTransport}, - poll_yield, SharedDemiRuntime, SharedObject, + poll_yield, + types::DEMI_SGARRAY_MAXLEN, + SharedDemiRuntime, SharedObject, }, }; +use ::arrayvec::ArrayVec; use ::socket2::{Domain, Type}; #[cfg(test)] use ::std::{collections::HashMap, hash::RandomState, net::Ipv4Addr, time::Duration}; +use consts::MAX_RECV_ITERS; use protocols::{ - layer1::PhysicalLayer, layer2::SharedLayer2Endpoint, layer3::SharedLayer3Endpoint, - layer4::ephemeral::EphemeralPorts, + layer1::PhysicalLayer, + layer2::SharedLayer2Endpoint, + layer3::SharedLayer3Endpoint, + layer4::{ephemeral::EphemeralPorts, Peer, Socket}, }; use ::futures::FutureExt; @@ -268,10 +270,10 @@ impl NetworkTransport for SharedInetStack { async fn push( &mut self, sd: &mut Self::SocketDescriptor, - buf: &mut DemiBuffer, + bufs: ArrayVec, addr: Option, ) -> Result<(), Fail> { - self.layer4_endpoint.push(sd, buf, addr).await + self.layer4_endpoint.push(sd, bufs, addr).await } /// Create a pop request to write data from IO connection represented by `qd` into a buffer @@ -280,7 +282,7 @@ impl NetworkTransport for SharedInetStack { &mut self, sd: &mut Self::SocketDescriptor, size: usize, - ) -> Result<(Option, DemiBuffer), Fail> { + ) -> Result<(Option, ArrayVec), Fail> { self.layer4_endpoint.pop(sd, size).await } diff --git a/src/inetstack/protocols/layer1/mod.rs b/src/inetstack/protocols/layer1/mod.rs index 0d8631fc0..8597d0614 100644 --- a/src/inetstack/protocols/layer1/mod.rs +++ b/src/inetstack/protocols/layer1/mod.rs @@ -5,12 +5,17 @@ // Imports //====================================================================================================================== -use crate::runtime::{fail::Fail, memory::DemiBuffer}; -use crate::{inetstack::consts::RECEIVE_BATCH_SIZE, runtime::memory::DemiMemoryAllocator}; -pub use ::std::any::Any; -use arrayvec::ArrayVec; +use ::arrayvec::ArrayVec; -use super::layer4::ephemeral::EphemeralPorts; +use crate::{ + inetstack::consts::MAX_BATCH_SIZE_NUM_PACKETS, + inetstack::protocols::layer4::ephemeral::EphemeralPorts, + runtime::{ + fail::Fail, + memory::{DemiBuffer, DemiMemoryAllocator}, + }, +}; +pub use ::std::any::Any; //====================================================================================================================== // Traits @@ -19,13 +24,14 @@ use super::layer4::ephemeral::EphemeralPorts; /// API for the Physical Layer for any underlying hardware that implements a raw NIC interface (e.g., DPDK, raw /// sockets). It must implement [DemiMemoryAllocator] to specify how to allocate DemiBuffers for the physical layer. pub trait PhysicalLayer: 'static + DemiMemoryAllocator { - /// Transmits a single [Demibuffer]. - fn transmit(&mut self, pkt: DemiBuffer) -> Result<(), Fail>; + /// Transmits a batch of [DemiBuffer]. + fn transmit(&mut self, pkts: ArrayVec) -> Result<(), Fail>; /// Receives a batch of [DemiBuffer]. - fn receive(&mut self) -> Result, Fail>; + fn receive(&mut self) -> Result, Fail>; - /// Returns the ephemeral ports on which this physical layer may operate. If none, any valid ephemeral port may be used. + /// Returns the ephemeral ports on which this physical layer may operate. If none, any valid ephemeral port may be + /// used. fn ephemeral_ports(&self) -> EphemeralPorts { EphemeralPorts::default() } diff --git a/src/inetstack/protocols/layer2/mod.rs b/src/inetstack/protocols/layer2/mod.rs index dbb6874e2..324a48c68 100644 --- a/src/inetstack/protocols/layer2/mod.rs +++ b/src/inetstack/protocols/layer2/mod.rs @@ -17,7 +17,7 @@ pub use self::ethernet2::{ use crate::{ demikernel::config::Config, - inetstack::{consts::RECEIVE_BATCH_SIZE, protocols::layer1::PhysicalLayer, types::MacAddress}, + inetstack::{consts::MAX_BATCH_SIZE_NUM_PACKETS, protocols::layer1::PhysicalLayer, types::MacAddress}, runtime::{ fail::Fail, memory::{DemiBuffer, DemiMemoryAllocator}, @@ -51,8 +51,8 @@ impl SharedLayer2Endpoint { }))) } - pub fn receive(&mut self) -> Result, Fail> { - let mut batch: ArrayVec<(EtherType2, DemiBuffer), RECEIVE_BATCH_SIZE> = ArrayVec::new(); + pub fn receive(&mut self) -> Result, Fail> { + let mut batch: ArrayVec<(EtherType2, DemiBuffer), MAX_BATCH_SIZE_NUM_PACKETS> = ArrayVec::new(); for mut pkt in self.layer1_endpoint.receive()? { let header: Ethernet2Header = match Ethernet2Header::parse_and_strip(&mut pkt) { Ok(result) => result, @@ -77,23 +77,29 @@ impl SharedLayer2Endpoint { } pub fn transmit_arp_packet(&mut self, remote_link_addr: MacAddress, pkt: DemiBuffer) -> Result<(), Fail> { - self.transmit(remote_link_addr, EtherType2::Arp, pkt) + let mut pkts: ArrayVec = ArrayVec::new(); + pkts.push(pkt); + self.transmit(remote_link_addr, EtherType2::Arp, pkts) } pub fn transmit_ipv4_packet(&mut self, remote_link_addr: MacAddress, pkt: DemiBuffer) -> Result<(), Fail> { - self.transmit(remote_link_addr, EtherType2::Ipv4, pkt) + let mut pkts: ArrayVec = ArrayVec::new(); + pkts.push(pkt); + self.transmit(remote_link_addr, EtherType2::Ipv4, pkts) } fn transmit( &mut self, remote_link_addr: MacAddress, eth2_type: EtherType2, - mut pkt: DemiBuffer, + mut pkts: ArrayVec, ) -> Result<(), Fail> { let eth2_header: Ethernet2Header = Ethernet2Header::new(remote_link_addr, self.local_link_addr, eth2_type); debug!("L2 OUTGOING {:?}", eth2_header); - eth2_header.serialize_and_attach(&mut pkt); - self.layer1_endpoint.transmit(pkt) + for pkt in pkts.iter_mut() { + eth2_header.serialize_and_attach(pkt); + } + self.layer1_endpoint.transmit(pkts) } pub fn get_local_link_addr(&self) -> MacAddress { diff --git a/src/inetstack/protocols/layer3/mod.rs b/src/inetstack/protocols/layer3/mod.rs index 8a47d1d7c..5859416e6 100644 --- a/src/inetstack/protocols/layer3/mod.rs +++ b/src/inetstack/protocols/layer3/mod.rs @@ -13,7 +13,7 @@ pub use self::{arp::SharedArpPeer, icmpv4::SharedIcmpv4Peer, ip::IpProtocol, ipv use crate::{ demikernel::config::Config, inetstack::{ - consts::RECEIVE_BATCH_SIZE, + consts::MAX_BATCH_SIZE_NUM_PACKETS, protocols::layer2::{EtherType2, SharedLayer2Endpoint}, }, runtime::{ @@ -66,7 +66,9 @@ impl SharedLayer3Endpoint { }))) } - pub fn receive(&mut self) -> Result, Fail> { + pub fn receive( + &mut self, + ) -> Result, Fail> { let mut batch = ArrayVec::new(); for (eth2_type, mut packet) in self.layer2_endpoint.receive()? { match eth2_type { diff --git a/src/inetstack/protocols/layer4/mod.rs b/src/inetstack/protocols/layer4/mod.rs index f7c6c58a2..044ac4d0e 100644 --- a/src/inetstack/protocols/layer4/mod.rs +++ b/src/inetstack/protocols/layer4/mod.rs @@ -19,7 +19,7 @@ use crate::{ demikernel::config::Config, expect_some, inetstack::{ - consts::RECEIVE_BATCH_SIZE, + consts::MAX_BATCH_SIZE_NUM_PACKETS, protocols::{ layer3::{ip::IpProtocol, SharedLayer3Endpoint}, layer4::{ @@ -97,7 +97,7 @@ impl Peer { } } - fn receive_batch(&mut self, batch: ArrayVec<(Ipv4Addr, IpProtocol, DemiBuffer), RECEIVE_BATCH_SIZE>) { + fn receive_batch(&mut self, batch: ArrayVec<(Ipv4Addr, IpProtocol, DemiBuffer), MAX_BATCH_SIZE_NUM_PACKETS>) { timer!("inetstack::layer4::receive_batch"); trace!("found packets: {:?}", batch.len()); for (src_ipv4_addr, ip_type, payload) in batch { @@ -338,19 +338,38 @@ impl Peer { } /// Pushes a buffer to a TCP socket. - pub async fn push(&mut self, sd: &mut Socket, buf: &mut DemiBuffer, addr: Option) -> Result<(), Fail> { + pub async fn push( + &mut self, + sd: &mut Socket, + bufs: ArrayVec, + addr: Option, + ) -> Result<(), Fail> { match sd { - Socket::Tcp(socket) => self.tcp.push(socket, buf).await, - Socket::Udp(socket) => self.udp.push(socket, buf, addr).await, + Socket::Tcp(socket) => self.tcp.push(socket, bufs).await, + Socket::Udp(socket) => { + for buf in bufs { + self.udp.push(socket, buf, addr).await?; + } + Ok(()) + }, } } /// Create a pop request to write data from IO connection represented by `qd` into a buffer /// allocated by the application. - pub async fn pop(&mut self, sd: &mut Socket, size: usize) -> Result<(Option, DemiBuffer), Fail> { + pub async fn pop( + &mut self, + sd: &mut Socket, + size: usize, + ) -> Result<(Option, ArrayVec), Fail> { match sd { Socket::Tcp(socket) => self.tcp.pop(socket, size).await, - Socket::Udp(socket) => self.udp.pop(socket, size).await, + Socket::Udp(socket) => { + let (addr, buf) = self.udp.pop(socket, size).await?; + let mut bufs: ArrayVec = ArrayVec::new(); + bufs.push(buf); + Ok((addr, bufs)) + }, } } } diff --git a/src/inetstack/protocols/layer4/tcp/established/mod.rs b/src/inetstack/protocols/layer4/tcp/established/mod.rs index 4923deadd..9641aa117 100644 --- a/src/inetstack/protocols/layer4/tcp/established/mod.rs +++ b/src/inetstack/protocols/layer4/tcp/established/mod.rs @@ -19,12 +19,16 @@ use crate::{ async_timer, inetstack::{ config::TcpConfig, - consts::MSL, + consts::{MAX_BATCH_SIZE_NUM_PACKETS, MSL}, protocols::{ layer3::SharedLayer3Endpoint, layer4::tcp::{ congestion_control::CongestionControlConstructor, - established::{ctrlblk::ControlBlock, ctrlblk::State, receiver::Receiver, sender::Sender}, + established::{ + ctrlblk::{ControlBlock, State}, + receiver::Receiver, + sender::Sender, + }, header::TcpHeader, SeqNumber, }, @@ -35,6 +39,7 @@ use crate::{ SharedDemiRuntime, SharedObject, }, }; +use ::arrayvec::ArrayVec; use ::futures::{join, pin_mut, FutureExt}; use ::std::{ net::SocketAddrV4, @@ -212,8 +217,13 @@ impl SharedEstablishedSocket { let wait_for_fin = pin!(me3.control_block.receiver.wait_for_fin().fuse()); let mut runtime = self.runtime.clone(); let mut layer3_endpoint = self.layer3_endpoint.clone(); - let push_fin_and_wait_for_ack = - pin!(Sender::push(&mut me2.control_block, &mut layer3_endpoint, &mut runtime, None).fuse()); + let push_fin_and_wait_for_ack = pin!(Sender::push( + &mut me2.control_block, + &mut layer3_endpoint, + &mut runtime, + ArrayVec::new() + ) + .fuse()); let (result1, result2) = join!(wait_for_fin, push_fin_and_wait_for_ack); result1?; result2?; @@ -233,19 +243,25 @@ impl SharedEstablishedSocket { // 1. Send FIN and wait for ack before closing. let mut runtime = self.runtime.clone(); let mut layer3_endpoint = self.layer3_endpoint.clone(); - Sender::push(&mut self.control_block, &mut layer3_endpoint, &mut runtime, None).await?; + Sender::push( + &mut self.control_block, + &mut layer3_endpoint, + &mut runtime, + ArrayVec::new(), + ) + .await?; debug_assert_eq!(self.control_block.state, State::Closed); Ok(()) } - pub async fn push(&mut self, buf: DemiBuffer) -> Result<(), Fail> { + pub async fn push(&mut self, bufs: ArrayVec) -> Result<(), Fail> { let mut runtime = self.runtime.clone(); let mut layer3_endpoint = self.layer3_endpoint.clone(); - Sender::push(&mut self.control_block, &mut layer3_endpoint, &mut runtime, Some(buf)).await + Sender::push(&mut self.control_block, &mut layer3_endpoint, &mut runtime, bufs).await } - pub async fn pop(&mut self, size: Option) -> Result { + pub async fn pop(&mut self, size: Option) -> Result, Fail> { self.control_block.receiver.pop(size).await } diff --git a/src/inetstack/protocols/layer4/tcp/established/receiver.rs b/src/inetstack/protocols/layer4/tcp/established/receiver.rs index 23575cf26..4347316f1 100644 --- a/src/inetstack/protocols/layer4/tcp/established/receiver.rs +++ b/src/inetstack/protocols/layer4/tcp/established/receiver.rs @@ -13,19 +13,23 @@ use ::std::{ use crate::{ collections::{async_queue::AsyncQueue, async_value::SharedAsyncValue}, expect_ok, - inetstack::protocols::{ - layer3::SharedLayer3Endpoint, - layer4::tcp::{ - established::{ - ctrlblk::State, ControlBlock, Sender, MAX_WINDOW_SIZE_WITHOUT_SCALING, MAX_WINDOW_SIZE_WITH_SCALING, + inetstack::{ + consts::MAX_BATCH_SIZE_NUM_PACKETS, + protocols::{ + layer3::SharedLayer3Endpoint, + layer4::tcp::{ + established::{ + ctrlblk::State, ControlBlock, Sender, MAX_WINDOW_SIZE_WITHOUT_SCALING, MAX_WINDOW_SIZE_WITH_SCALING, + }, + header::TcpHeader, + SeqNumber, }, - header::TcpHeader, - SeqNumber, }, }, runtime::{fail::Fail, memory::DemiBuffer}, }; +use ::arrayvec::ArrayVec; use ::futures::never::Never; //====================================================================================================================== @@ -112,31 +116,43 @@ impl Receiver { } // Block until some data is received, up to an optional size. - pub async fn pop(&mut self, size: Option) -> Result { - debug!("waiting on pop {:?}", size); - let buffer = if let Some(size) = size { - let mut buffer = self.pop_queue.pop(None).await?; - // Split the buffer if it's too big. - if buffer.len() > size { - buffer.split_front(size)? - } else { - buffer + pub async fn pop( + &mut self, + mut size: Option, + ) -> Result, Fail> { + let mut bufs = ArrayVec::new(); + let mut buf = self.pop_queue.pop(None).await?; + loop { + if let Some(size) = size.as_mut() { + if buf.len() > *size { + let remaining_buf = buf.split_front(*size)?; + self.pop_queue.push_front(remaining_buf); + } + *size -= buf.len(); } - } else { - self.pop_queue.pop(None).await? - }; + match buf.len() { + len if len > 0 => { + self.reader_next_seq_no = self.reader_next_seq_no + SeqNumber::from(buf.len() as u32); + }, + _ => { + debug!("found FIN"); + self.reader_next_seq_no = self.reader_next_seq_no + 1.into(); + bufs.push(buf); - match buffer.len() { - len if len > 0 => { - self.reader_next_seq_no = self.reader_next_seq_no + SeqNumber::from(buffer.len() as u32); - }, - _ => { - debug!("found FIN"); - self.reader_next_seq_no = self.reader_next_seq_no + 1.into(); - }, + break; + }, + } + bufs.push(buf); + match size { + Some(0) => break, + _ => match self.pop_queue.try_pop() { + Some(next_buf) => buf = next_buf, + None => break, + }, + } } - Ok(buffer) + Ok(bufs) } // Receive a single incoming packet from layer3. diff --git a/src/inetstack/protocols/layer4/tcp/established/sender.rs b/src/inetstack/protocols/layer4/tcp/established/sender.rs index 0c03f039c..874d320ea 100644 --- a/src/inetstack/protocols/layer4/tcp/established/sender.rs +++ b/src/inetstack/protocols/layer4/tcp/established/sender.rs @@ -8,7 +8,7 @@ use crate::{ collections::{async_queue::SharedAsyncQueue, async_value::SharedAsyncValue}, inetstack::{ - consts::MAX_HEADER_SIZE, + consts::{MAX_BATCH_SIZE_NUM_PACKETS, MAX_HEADER_SIZE}, protocols::{ layer3::SharedLayer3Endpoint, layer4::tcp::{ @@ -20,6 +20,7 @@ use crate::{ }, runtime::{conditional_yield_until, fail::Fail, memory::DemiBuffer, SharedDemiRuntime}, }; +use ::arrayvec::ArrayVec; use ::futures::{never::Never, pin_mut, select_biased, FutureExt}; use ::std::{ cmp, fmt, @@ -227,12 +228,12 @@ impl Sender { } } - // This function sends a packet (or FIN) indicated by [buf] and waits for it to be acked. + // This function sends a list of packets (or FIN if empty) and waits for it to be acked. pub async fn push( cb: &mut ControlBlock, layer3_endpoint: &mut SharedLayer3Endpoint, runtime: &mut SharedDemiRuntime, - mut buf: Option, + bufs: ArrayVec, ) -> Result<(), Fail> { // If the user is done sending (i.e. has called close on this connection), then they shouldn't be sending. debug_assert!(cb.sender.fin_seq_no.is_none()); @@ -245,26 +246,26 @@ impl Sender { trace!("push(): total unsent segments={:?}", cb.sender.unsent_queue.len()); // Check if closing the socket and sending FIN. - if let Some(buf) = buf.as_mut() { - cb.sender.unsent_next_seq_no = cb.sender.unsent_next_seq_no + (buf.len() as u32).into(); - if cb.sender.send_window.get() > 0 { - Self::send_segment(cb, layer3_endpoint, runtime.get_now(), buf); - } - } else { + if bufs.is_empty() { // We can always send the FIN immediately. cb.sender.fin_seq_no = Some(cb.sender.unsent_next_seq_no); cb.sender.unsent_next_seq_no = cb.sender.unsent_next_seq_no + 1.into(); Self::send_fin(cb, layer3_endpoint, runtime.get_now())?; + } else { + for mut buf in bufs.into_iter() { + cb.sender.unsent_next_seq_no = cb.sender.unsent_next_seq_no + (buf.len() as u32).into(); + if cb.sender.send_window.get() > 0 { + Self::send_segment(cb, layer3_endpoint, runtime.get_now(), &mut buf); + + if !buf.is_empty() { + cb.sender.unsent_queue.push(buf); + } + } + } } - // Place the buffer in the unsent queue. - if let Some(buf) = buf.take() { - if !cb.sender.unacked_queue.is_empty() { - trace!("push(): total unacked segments={:?}", cb.sender.unacked_queue.len()); - } - if !buf.is_empty() { - cb.sender.unsent_queue.push(buf); - } + if !cb.sender.unacked_queue.is_empty() { + trace!("push(): total unacked segments={:?}", cb.sender.unacked_queue.len()); } // Wait until the sequnce number of the pushed buffer is acknowledged. @@ -400,8 +401,7 @@ impl Sender { now: Instant, segment: &mut DemiBuffer, ) -> usize { - let buffer_length = segment.len(); - debug_assert_ne!(buffer_length, 0); + debug_assert!(!segment.is_empty()); let max_frame_size_bytes = Self::get_open_window_size_bytes(cb); if max_frame_size_bytes == 0 { @@ -411,12 +411,12 @@ impl Sender { // Split the packet if necessary. // TODO: Use a scatter/gather array to coalesce multiple buffers into a single segment. let (frame_size_bytes, do_push) = { - if buffer_length > max_frame_size_bytes { + if segment.len() > max_frame_size_bytes { // Suppress PSH flag for partial buffers. (max_frame_size_bytes, false) } else { // We can just send the whole packet. Clone it so we can attach headers/retransmit it later. - (buffer_length, true) + (segment.len(), true) } }; let segment_data = segment diff --git a/src/inetstack/protocols/layer4/tcp/peer.rs b/src/inetstack/protocols/layer4/tcp/peer.rs index ab109a152..407ec24b2 100644 --- a/src/inetstack/protocols/layer4/tcp/peer.rs +++ b/src/inetstack/protocols/layer4/tcp/peer.rs @@ -9,6 +9,7 @@ use crate::{ demikernel::config::Config, inetstack::{ config::TcpConfig, + consts::MAX_BATCH_SIZE_NUM_PACKETS, protocols::{ layer3::SharedLayer3Endpoint, layer4::tcp::{header::TcpHeader, isn_generator::IsnGenerator, socket::SharedTcpSocket, SeqNumber}, @@ -24,6 +25,7 @@ use crate::{ SharedDemiRuntime, SharedObject, }, }; +use ::arrayvec::ArrayVec; use ::rand::{prelude::SmallRng, Rng, SeedableRng}; use ::std::{ @@ -170,11 +172,13 @@ impl SharedTcpPeer { } /// Pushes immediately to the socket and returns the result asynchronously. - pub async fn push(&self, socket: &mut SharedTcpSocket, buf: &mut DemiBuffer) -> Result<(), Fail> { - // TODO: Remove this copy after merging with the transport trait. + pub async fn push( + &self, + socket: &mut SharedTcpSocket, + bufs: ArrayVec, + ) -> Result<(), Fail> { // Wait for push to complete. - socket.push(buf.clone()).await?; - buf.trim(buf.len()) + socket.push(bufs).await } /// Sets up a coroutine for popping data from the socket. @@ -182,11 +186,11 @@ impl SharedTcpPeer { &self, socket: &mut SharedTcpSocket, size: usize, - ) -> Result<(Option, DemiBuffer), Fail> { + ) -> Result<(Option, ArrayVec), Fail> { // Grab the queue, make sure it hasn't been closed in the meantime. // This will bump the Rc refcount so the coroutine can have it's own reference to the shared queue data // structure and the SharedTcpQueue will not be freed until this coroutine finishes. - let incoming: DemiBuffer = socket.pop(Some(size)).await?; + let incoming: ArrayVec = socket.pop(Some(size)).await?; Ok((None, incoming)) } diff --git a/src/inetstack/protocols/layer4/tcp/socket.rs b/src/inetstack/protocols/layer4/tcp/socket.rs index ab4d2ff9a..bb34f44c4 100644 --- a/src/inetstack/protocols/layer4/tcp/socket.rs +++ b/src/inetstack/protocols/layer4/tcp/socket.rs @@ -5,10 +5,13 @@ // Imports //====================================================================================================================== +use ::arrayvec::ArrayVec; + use crate::{ expect_some, inetstack::{ config::TcpConfig, + consts::MAX_BATCH_SIZE_NUM_PACKETS, protocols::{ layer3::SharedLayer3Endpoint, layer4::tcp::{ @@ -197,18 +200,18 @@ impl SharedTcpSocket { Ok(()) } - pub async fn push(&mut self, buf: DemiBuffer) -> Result<(), Fail> { + pub async fn push(&mut self, bufs: ArrayVec) -> Result<(), Fail> { // Send synchronously. match self.state { SocketState::Established(ref mut socket) => { // Wait for ack. - socket.push(buf).await + socket.push(bufs).await }, _ => unreachable!("State machine check should ensure that this socket is connected"), } } - pub async fn pop(&mut self, size: Option) -> Result { + pub async fn pop(&mut self, size: Option) -> Result, Fail> { match self.state { SocketState::Established(ref mut socket) => socket.pop(size).await, _ => unreachable!("State machine check should ensure that this socket is connected"), diff --git a/src/inetstack/protocols/layer4/udp/peer.rs b/src/inetstack/protocols/layer4/udp/peer.rs index 41c496b2f..54d4aadef 100644 --- a/src/inetstack/protocols/layer4/udp/peer.rs +++ b/src/inetstack/protocols/layer4/udp/peer.rs @@ -97,7 +97,7 @@ impl SharedUdpPeer { pub async fn push( &mut self, socket: &mut SharedUdpSocket, - buf: &mut DemiBuffer, + buf: DemiBuffer, remote: Option, ) -> Result<(), Fail> { // TODO: Allocate ephemeral port if not bound. @@ -107,9 +107,7 @@ impl SharedUdpPeer { error!("pushto(): {}", cause); return Err(Fail::new(libc::ENOTSUP, cause)); } - // TODO: Remove copy once we actually use push coroutine for send. - socket.push(remote, buf.clone()).await?; - buf.trim(buf.len()) + socket.push(remote, buf).await } /// Pops data from a socket. diff --git a/src/inetstack/protocols/layer4/udp/tests.rs b/src/inetstack/protocols/layer4/udp/tests.rs index 03696b7c1..35f5d36bc 100644 --- a/src/inetstack/protocols/layer4/udp/tests.rs +++ b/src/inetstack/protocols/layer4/udp/tests.rs @@ -3,7 +3,7 @@ use crate::{ inetstack::{ - consts::MAX_HEADER_SIZE, + consts::{MAX_BATCH_SIZE_NUM_PACKETS, MAX_HEADER_SIZE}, test_helpers::{self, engine::SharedEngine}, }, runtime::{ @@ -12,6 +12,7 @@ use crate::{ }, }; use ::anyhow::Result; +use ::arrayvec::ArrayVec; use ::libc::EBADF; use ::std::{ net::{Ipv4Addr, SocketAddrV4}, @@ -91,12 +92,13 @@ fn udp_push_pop() -> Result<()> { carrie.push_frame(bob.pop_frame()); let carrie_qt: QToken = carrie.udp_pop(carrie_fd)?; - let (remote_addr, received_buf): (Option, DemiBuffer) = match carrie.wait(carrie_qt)? { - (_, OperationResult::Pop(addr, buf)) => (addr, buf), - _ => anyhow::bail!("Pop failed"), - }; + let (remote_addr, received_bufs): (Option, ArrayVec) = + match carrie.wait(carrie_qt)? { + (_, OperationResult::Pop(addr, bufs)) => (addr, bufs), + _ => anyhow::bail!("Pop failed"), + }; assert_eq!(remote_addr.unwrap(), bob_addr); - assert_eq!(received_buf[..], buf[..]); + assert_eq!(received_bufs[0][..], buf[..]); // Close peers. bob.udp_close(bob_fd)?; @@ -141,12 +143,13 @@ fn udp_push_pop_wildcard_address() -> Result<()> { // Take a packet from Bob and deliver to Carrie. carrie.push_frame(bob.pop_frame()); let carrie_qt: QToken = carrie.udp_pop(carrie_fd)?; - let (remote_addr, received_buf): (Option, DemiBuffer) = match carrie.wait(carrie_qt)? { - (_, OperationResult::Pop(addr, buf)) => (addr, buf), - _ => anyhow::bail!("Pop failed"), - }; + let (remote_addr, received_bufs): (Option, ArrayVec) = + match carrie.wait(carrie_qt)? { + (_, OperationResult::Pop(addr, bufs)) => (addr, bufs), + _ => anyhow::bail!("Pop failed"), + }; assert_eq!(remote_addr.unwrap(), bob_addr); - assert_eq!(received_buf[..], buf[..]); + assert_eq!(received_bufs[0][..], buf[..]); // Close peers. bob.udp_close(bob_fd)?; carrie.udp_close(carrie_fd)?; @@ -190,12 +193,13 @@ fn udp_ping_pong() -> Result<()> { carrie.push_frame(bob.pop_frame()); let carrie_qt: QToken = carrie.udp_pop(carrie_fd)?; - let (remote_addr, received_buf_a): (Option, DemiBuffer) = match carrie.wait(carrie_qt)? { - (_, OperationResult::Pop(addr, buf)) => (addr, buf), - _ => anyhow::bail!("Pop failed"), - }; + let (remote_addr, received_buf_a): (Option, ArrayVec) = + match carrie.wait(carrie_qt)? { + (_, OperationResult::Pop(addr, bufs)) => (addr, bufs), + _ => anyhow::bail!("Pop failed"), + }; assert_eq!(remote_addr.unwrap(), bob_addr); - assert_eq!(received_buf_a[..], buf_a[..]); + assert_eq!(received_buf_a[0][..], buf_a[..]); now += Duration::from_micros(1); @@ -212,12 +216,13 @@ fn udp_ping_pong() -> Result<()> { // Take a packet from Carrie and deliver to Bob. bob.push_frame(carrie.pop_frame()); let bob_qt: QToken = bob.udp_pop(bob_fd)?; - let (remote_addr, received_buf_b): (Option, DemiBuffer) = match bob.wait(bob_qt)? { - (_, OperationResult::Pop(addr, buf)) => (addr, buf), - _ => anyhow::bail!("Pop failed"), - }; + let (remote_addr, received_buf_b): (Option, ArrayVec) = + match bob.wait(bob_qt)? { + (_, OperationResult::Pop(addr, bufs)) => (addr, bufs), + _ => anyhow::bail!("Pop failed"), + }; assert_eq!(remote_addr.unwrap(), carrie_addr); - assert_eq!(received_buf_b[..], buf_b[..]); + assert_eq!(received_buf_b[0][..], buf_b[..]); // Close peers. bob.udp_close(bob_fd)?; @@ -321,12 +326,13 @@ fn udp_loop2_push_pop() -> Result<()> { // Take a packet from Bob and deliver to Carrie. carrie.push_frame(bob.pop_frame()); let carrie_qt: QToken = carrie.udp_pop(carrie_fd)?; - let (remote_addr, received_buf): (Option, DemiBuffer) = match carrie.wait(carrie_qt)? { - (_, OperationResult::Pop(addr, buf)) => (addr, buf), - _ => anyhow::bail!("Pop failed"), - }; + let (remote_addr, received_bufs): (Option, ArrayVec) = + match carrie.wait(carrie_qt)? { + (_, OperationResult::Pop(addr, bufs)) => (addr, bufs), + _ => anyhow::bail!("Pop failed"), + }; assert_eq!(remote_addr.unwrap(), bob_addr); - assert_eq!(received_buf[..], buf[..]); + assert_eq!(received_bufs[0][..], buf[..]); } // Close peers. @@ -384,12 +390,13 @@ fn udp_loop2_ping_pong() -> Result<()> { // Take a packet from Bob and deliver to Carrie. carrie.push_frame(bob.pop_frame()); let carrie_qt: QToken = carrie.udp_pop(carrie_fd)?; - let (remote_addr, received_buf_a): (Option, DemiBuffer) = match carrie.wait(carrie_qt)? { - (_, OperationResult::Pop(addr, buf)) => (addr, buf), - _ => anyhow::bail!("Pop failed"), - }; + let (remote_addr, received_buf_a): (Option, ArrayVec) = + match carrie.wait(carrie_qt)? { + (_, OperationResult::Pop(addr, bufs)) => (addr, bufs), + _ => anyhow::bail!("Pop failed"), + }; assert_eq!(remote_addr.unwrap(), bob_addr); - assert_eq!(received_buf_a[..], buf_a[..]); + assert_eq!(received_buf_a[0][..], buf_a[..]); now += Duration::from_micros(1); @@ -407,12 +414,13 @@ fn udp_loop2_ping_pong() -> Result<()> { // Take a packet from Carrie and deliver to Bob. bob.push_frame(carrie.pop_frame()); let bob_qt: QToken = bob.udp_pop(bob_fd)?; - let (remote_addr, received_buf_b): (Option, DemiBuffer) = match bob.wait(bob_qt)? { - (_, OperationResult::Pop(addr, buf)) => (addr, buf), - _ => anyhow::bail!("Pop failed"), - }; + let (remote_addr, received_buf_b): (Option, ArrayVec) = + match bob.wait(bob_qt)? { + (_, OperationResult::Pop(addr, bufs)) => (addr, bufs), + _ => anyhow::bail!("Pop failed"), + }; assert_eq!(remote_addr.unwrap(), carrie_addr); - assert_eq!(received_buf_b[..], buf_b[..]); + assert_eq!(received_buf_b[0][..], buf_b[..]); } // Close peers. diff --git a/src/inetstack/test_helpers/engine.rs b/src/inetstack/test_helpers/engine.rs index 40c6f915b..215ed5cd0 100644 --- a/src/inetstack/test_helpers/engine.rs +++ b/src/inetstack/test_helpers/engine.rs @@ -10,10 +10,13 @@ use crate::{ demikernel::{config::Config, libos::network::libos::SharedNetworkLibOS}, inetstack::{test_helpers::SharedTestPhysicalLayer, types::MacAddress, SharedInetStack}, runtime::{ - fail::Fail, memory::into_sgarray, memory::DemiBuffer, OperationResult, QDesc, QToken, SharedDemiRuntime, - SharedObject, + fail::Fail, + memory::{into_sgarray, DemiBuffer}, + types::DEMI_SGARRAY_MAXLEN, + OperationResult, QDesc, QToken, SharedDemiRuntime, SharedObject, }, }; +use ::arrayvec::ArrayVec; use ::socket2::{Domain, Protocol, Type}; use ::std::{ collections::{HashMap, VecDeque}, @@ -90,7 +93,10 @@ impl SharedEngine { } pub fn udp_pushto(&mut self, qd: QDesc, buf: DemiBuffer, to: SocketAddrV4) -> Result { - let data: demi_sgarray_t = into_sgarray(buf)?; + let mut bufs: ArrayVec = ArrayVec::new(); + bufs.push(buf); + let data: demi_sgarray_t = into_sgarray(bufs)?; + debug!("sgarray"); self.libos.pushto(qd, &data, to.into()) } @@ -131,7 +137,9 @@ impl SharedEngine { } pub fn tcp_push(&mut self, socket_fd: QDesc, buf: DemiBuffer) -> Result { - let data: demi_sgarray_t = into_sgarray(buf)?; + let mut bufs: ArrayVec = ArrayVec::new(); + bufs.push(buf); + let data: demi_sgarray_t = into_sgarray(bufs)?; self.libos.push(socket_fd, &data) } diff --git a/src/inetstack/test_helpers/physical_layer.rs b/src/inetstack/test_helpers/physical_layer.rs index 1427f2280..ee7508501 100644 --- a/src/inetstack/test_helpers/physical_layer.rs +++ b/src/inetstack/test_helpers/physical_layer.rs @@ -6,7 +6,7 @@ //====================================================================================================================== use crate::{ - inetstack::consts::{MAX_HEADER_SIZE, RECEIVE_BATCH_SIZE}, + inetstack::consts::{MAX_BATCH_SIZE_NUM_PACKETS, MAX_HEADER_SIZE}, inetstack::protocols::layer1::PhysicalLayer, runtime::{ fail::Fail, @@ -79,22 +79,24 @@ impl SharedTestPhysicalLayer { //====================================================================================================================== impl PhysicalLayer for SharedTestPhysicalLayer { - fn transmit(&mut self, pkt: DemiBuffer) -> Result<(), Fail> { - debug!( - "transmit frame: {:?} total packet size: {:?}", - self.outgoing.len(), - pkt.len() - ); - - // The packet header and body must fit into whatever physical media we're transmitting over. - // For this test harness, we 2^16 bytes (u16::MAX) as our limit. - assert!(pkt.len() < u16::MAX as usize); - - self.outgoing.push_back(pkt); + fn transmit(&mut self, pkts: ArrayVec) -> Result<(), Fail> { + for pkt in pkts { + debug!( + "transmit frame: {:?} total packet size: {:?}", + self.outgoing.len(), + pkt.len() + ); + + // The packet header and body must fit into whatever physical media we're transmitting over. + // For this test harness, we 2^16 bytes (u16::MAX) as our limit. + assert!(pkt.len() < u16::MAX as usize); + + self.outgoing.push_back(pkt); + } Ok(()) } - fn receive(&mut self) -> Result, Fail> { + fn receive(&mut self) -> Result, Fail> { let mut out = ArrayVec::new(); if let Some(buf) = self.incoming.pop_front() { out.push(buf); diff --git a/src/runtime/memory/mod.rs b/src/runtime/memory/mod.rs index 674a5f0f8..195fc9490 100644 --- a/src/runtime/memory/mod.rs +++ b/src/runtime/memory/mod.rs @@ -11,8 +11,9 @@ mod memory_pool; use crate::runtime::{ fail::Fail, - types::{demi_sgarray_t, demi_sgaseg_t}, + types::{demi_sgarray_t, demi_sgaseg_t, DEMI_SGARRAY_MAXLEN}, }; +use ::arrayvec::ArrayVec; use ::libc::c_void; use ::std::{mem, ptr::NonNull}; @@ -27,33 +28,46 @@ pub use self::{buffer_pool::*, demibuffer::*}; //====================================================================================================================== pub trait DemiMemoryAllocator { + fn get_max_buffer_size_bytes(&self) -> usize { + u16::MAX as usize + } + fn allocate_demi_buffer(&self, size: usize) -> Result { Ok(DemiBuffer::new(size as u16)) } } -/// Converts a buffer into a scatter-gather array. -pub fn into_sgarray(buf: DemiBuffer) -> Result { - // Create a scatter-gather segment to expose the DemiBuffer to the user. - let data: *const u8 = buf.as_ptr(); - let sga_seg: demi_sgaseg_t = demi_sgaseg_t { - sgaseg_buf: data as *mut c_void, - sgaseg_len: buf.len() as u32, - }; +/// Converts a list of DemiBuffers into a scatter-gather array. +pub fn into_sgarray(bufs: ArrayVec) -> Result { + // Check the sizes before allocating anything. + if bufs.is_empty() { + let cause = "cannot allocate a zero element scatter-gather array"; + error!("into_sgarray(): {}", cause); + return Err(Fail::new(libc::EINVAL, &cause)); + } + if bufs.len() > DEMI_SGARRAY_MAXLEN { + let cause = format!("cannot allocate a {} element scatter-gather array", bufs.len()); + error!("into_sgarray(): {}", cause); + return Err(Fail::new(libc::EINVAL, &cause)); + } + + // Create a scatter-gather segment to expose the DemiBuffers to the user. + let mut sga: demi_sgarray_t = demi_sgarray_t::default(); + sga.num_segments = bufs.len() as u32; + + for (i, buf) in bufs.into_iter().enumerate() { + sga.segments[i].data_buf_ptr = buf.as_ptr() as *mut c_void; + sga.segments[i].data_len_bytes = buf.len() as u32; + sga.segments[i].reserved_metadata_ptr = buf.into_raw().as_ptr() as *mut c_void; + } // Create and return a new scatter-gather array (which inherits the DemiBuffer's reference). - Ok(demi_sgarray_t { - sga_buf: buf.into_raw().as_ptr() as *mut c_void, - sga_numsegs: 1, - sga_segs: [sga_seg], - sga_addr: unsafe { mem::zeroed() }, - }) + Ok(sga) } /// Allocates a scatter-gather array. pub fn sgaalloc(size: usize, mem_alloc: &M) -> Result { - // TODO: Allocate an array of buffers if requested size is too large for a single buffer. - + // Check the sizes before allocating anything. // We can't allocate a zero-sized buffer. if size == 0 { let cause: &'static str = "cannot allocate a zero-sized buffer"; @@ -61,77 +75,81 @@ pub fn sgaalloc(size: usize, mem_alloc: &M) -> Result u16::MAX as usize { + // First allocate the underlying DemiBuffer. + if size > mem_alloc.get_max_buffer_size_bytes() * DEMI_SGARRAY_MAXLEN { return Err(Fail::new(libc::EINVAL, "size too large for a single demi_sgaseg_t")); } - - // First allocate the underlying DemiBuffer. - let buf: DemiBuffer = mem_alloc.allocate_demi_buffer(size)?; - debug_assert_eq!(buf.len(), size); - - into_sgarray(buf) + // Calculate the number of DemiBuffers to allocate. + let max_buffer_size_bytes: usize = mem_alloc.get_max_buffer_size_bytes(); + let remainder: usize = size % max_buffer_size_bytes; + let len: usize = (size - remainder) / max_buffer_size_bytes; + let mut bufs: ArrayVec = ArrayVec::new(); + for _ in 0..len { + bufs.push(mem_alloc.allocate_demi_buffer(max_buffer_size_bytes)?); + } + // If there is any remaining length, allocate a partial buffer. + if remainder > 0 { + bufs.push(mem_alloc.allocate_demi_buffer(remainder)?); + } + into_sgarray(bufs) } /// Releases a scatter-gather array. pub fn sgafree(sga: demi_sgarray_t) -> Result<(), Fail> { // Check arguments. - // TODO: Drop this check once we support scatter-gather arrays with multiple segments. - if sga.sga_numsegs != 1 { + if sga.num_segments > DEMI_SGARRAY_MAXLEN as u32 { return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid segment count")); } - if sga.sga_buf.is_null() { - return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid DemiBuffer token")); + for i in 0..sga.num_segments as usize { + let buf: DemiBuffer = convert_sgaseg_to_demi_buffer(&sga.segments[i])?; + drop(buf); } - - // Convert back to a DemiBuffer and drop it. - // Safety: The `NonNull::new_unchecked()` call is safe, as we verified `sga.sga_buf` is not null above. - let token: NonNull = unsafe { NonNull::new_unchecked(sga.sga_buf as *mut u8) }; - // Safety: The `DemiBuffer::from_raw()` call *should* be safe, as the `sga_buf` field in the `demi_sgarray_t` - // contained a valid `DemiBuffer` token when we provided it to the user (and the user shouldn't change it). - let buf: DemiBuffer = unsafe { DemiBuffer::from_raw(token) }; - drop(buf); - Ok(()) } -/// Clones a scatter-gather array. -pub fn clone_sgarray(sga: &demi_sgarray_t) -> Result { +/// Clones a scatter-gather array. The [sga_buf] field must point to the first DemiBuffer in the chain and the elements +/// of [segments] must be the rest of the chain. +pub fn clone_sgarray(sga: &demi_sgarray_t) -> Result, Fail> { // Check arguments. - // TODO: Drop this check once we support scatter-gather arrays with multiple segments. - if sga.sga_numsegs != 1 { + if sga.num_segments > DEMI_SGARRAY_MAXLEN as u32 || sga.num_segments == 0 { return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid segment count")); } - if sga.sga_buf.is_null() { - return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid DemiBuffer token")); + let mut bufs: ArrayVec = ArrayVec::new(); + for i in 0..sga.num_segments as usize { + // Convert back to a DemiBuffer. + let buf: DemiBuffer = convert_sgaseg_to_demi_buffer(&sga.segments[i])?; + // Clone the DemiBuffer, this will recursively clone the entire chain. + let clone: DemiBuffer = buf.clone(); + + // Don't drop buf, as it holds the same reference to the data as the sgarray (which should keep it). + mem::forget(buf); + bufs.push(clone); } + Ok(bufs) +} - // Convert back to a DemiBuffer. +fn convert_sgaseg_to_demi_buffer(sga_seg: &demi_sgaseg_t) -> Result { + if sga_seg.reserved_metadata_ptr.is_null() { + return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid DemiBuffer token")); + } + // Convert back to a DemiBuffer and drop it. // Safety: The `NonNull::new_unchecked()` call is safe, as we verified `sga.sga_buf` is not null above. - let token: NonNull = unsafe { NonNull::new_unchecked(sga.sga_buf as *mut u8) }; + let token: NonNull = unsafe { NonNull::new_unchecked(sga_seg.reserved_metadata_ptr as *mut u8) }; // Safety: The `DemiBuffer::from_raw()` call *should* be safe, as the `sga_buf` field in the `demi_sgarray_t` // contained a valid `DemiBuffer` token when we provided it to the user (and the user shouldn't change it). - let buf: DemiBuffer = unsafe { DemiBuffer::from_raw(token) }; - let mut clone: DemiBuffer = buf.clone(); - - // Don't drop buf, as it holds the same reference to the data as the sgarray (which should keep it). - mem::forget(buf); - - // Check whether the limits of the buffer have changed. - check_demi_buf_limits(sga, &mut clone)?; - - // Return the clone. - Ok(clone) + let mut buf: DemiBuffer = unsafe { DemiBuffer::from_raw(token) }; + check_demi_buf_limits(sga_seg, &mut buf)?; + Ok(buf) } /// Check to see if the user has reduced the size of the buffer described by the sgarray segment since we provided it to -/// them. They could have increased the starting address of the buffer (`sgaseg_buf`), decreased the ending address of -/// the buffer (`sgaseg_buf + sgaseg_len`), or both. -fn check_demi_buf_limits(sga: &demi_sgarray_t, clone: &mut DemiBuffer) -> Result<(), Fail> { - let sga_data: *const u8 = sga.sga_segs[0].sgaseg_buf as *const u8; - let sga_len: usize = sga.sga_segs[0].sgaseg_len as usize; +/// them. They could have increased the starting address of the buffer (`data_buf_ptr`), decreased the ending address of +/// the buffer (`data_buf_ptr + data_buf_len`), or both. +fn check_demi_buf_limits(sga_seg: &demi_sgaseg_t, clone: &mut DemiBuffer) -> Result<(), Fail> { + let sga_data: *const u8 = sga_seg.data_buf_ptr as *const u8; + let sga_len: usize = sga_seg.data_len_bytes as usize; let clone_data: *const u8 = clone.as_ptr(); let mut clone_len: usize = clone.len(); if sga_data != clone_data || sga_len != clone_len { diff --git a/src/runtime/network/transport.rs b/src/runtime/network/transport.rs index b83cbff31..78388eacb 100644 --- a/src/runtime/network/transport.rs +++ b/src/runtime/network/transport.rs @@ -9,8 +9,10 @@ use crate::runtime::{ fail::Fail, memory::{DemiBuffer, DemiMemoryAllocator}, network::socket::option::SocketOption, + types::DEMI_SGARRAY_MAXLEN, SharedDemiRuntime, }; +use ::arrayvec::ArrayVec; use ::socket2::{Domain, Type}; use ::std::{ fmt::Debug, @@ -69,7 +71,7 @@ pub trait NetworkTransport: Clone + 'static + DemiMemoryAllocator { fn push( &mut self, sd: &mut Self::SocketDescriptor, - buf: &mut DemiBuffer, + buf: ArrayVec, addr: Option, ) -> impl std::future::Future>; @@ -78,7 +80,7 @@ pub trait NetworkTransport: Clone + 'static + DemiMemoryAllocator { &mut self, sd: &mut Self::SocketDescriptor, size: usize, - ) -> impl std::future::Future, DemiBuffer), Fail>>; + ) -> impl std::future::Future, ArrayVec), Fail>>; /// Asynchronously close a socket. fn close(&mut self, sd: &mut Self::SocketDescriptor) -> impl std::future::Future>; diff --git a/src/runtime/queue/operation_result.rs b/src/runtime/queue/operation_result.rs index 494ffeb35..840efb38a 100644 --- a/src/runtime/queue/operation_result.rs +++ b/src/runtime/queue/operation_result.rs @@ -5,7 +5,9 @@ // Imports //====================================================================================================================== -use crate::runtime::{fail::Fail, memory::DemiBuffer, QDesc}; +use ::arrayvec::ArrayVec; + +use crate::runtime::{fail::Fail, memory::DemiBuffer, types::DEMI_SGARRAY_MAXLEN, QDesc}; use ::std::{fmt, net::SocketAddrV4}; //====================================================================================================================== @@ -17,7 +19,7 @@ pub enum OperationResult { Connect, Accept((QDesc, SocketAddrV4)), Push, - Pop(Option, DemiBuffer), + Pop(Option, ArrayVec), Close, Failed(Fail), } diff --git a/src/runtime/types/memory.rs b/src/runtime/types/memory.rs index a7eb2beae..1a7cd5b60 100644 --- a/src/runtime/types/memory.rs +++ b/src/runtime/types/memory.rs @@ -5,8 +5,10 @@ // Constants //====================================================================================================================== -/// Maximum Length for Scatter-Gather Arrays -pub const DEMI_SGARRAY_MAXLEN: usize = 1; +use ::std::{mem, ptr}; + +/// Maximum Length for Scatter-Gather Arrays. Cannot be larger than u16::MAX +pub const DEMI_SGARRAY_MAXLEN: usize = 20; //====================================================================================================================== // Structures @@ -16,26 +18,43 @@ pub const DEMI_SGARRAY_MAXLEN: usize = 1; #[repr(C, packed)] #[derive(Copy, Clone)] pub struct demi_sgaseg_t { - /// Underlying data. - pub sgaseg_buf: *mut libc::c_void, - /// Length of underlying data. - pub sgaseg_len: u32, + pub reserved_metadata_ptr: *mut libc::c_void, + pub data_buf_ptr: *mut libc::c_void, + pub data_len_bytes: u32, } /// Scatter-Gather Array #[repr(C, packed)] #[derive(Copy, Clone)] pub struct demi_sgarray_t { - /// Reserved. - pub sga_buf: *mut libc::c_void, - /// Number of segments in this scatter-gather array. - pub sga_numsegs: u32, - /// Scatter-gather array segments. - pub sga_segs: [demi_sgaseg_t; DEMI_SGARRAY_MAXLEN], - /// Source address of the data contained in this scatter-gather array (if present). - pub sga_addr: libc::sockaddr, + pub num_segments: u32, + pub segments: [demi_sgaseg_t; DEMI_SGARRAY_MAXLEN], + pub sockaddr_src: libc::sockaddr, } +//====================================================================================================================== +// Trait Implementations +//====================================================================================================================== + +impl Default for demi_sgaseg_t { + fn default() -> Self { + Self { + reserved_metadata_ptr: ptr::null_mut() as *mut _, + data_buf_ptr: ptr::null_mut() as *mut libc::c_void, + data_len_bytes: 0, + } + } +} + +impl Default for demi_sgarray_t { + fn default() -> Self { + Self { + num_segments: 0, + segments: [demi_sgaseg_t::default(); DEMI_SGARRAY_MAXLEN], + sockaddr_src: unsafe { mem::zeroed() }, + } + } +} //====================================================================================================================== // Unit Tests //====================================================================================================================== @@ -50,29 +69,32 @@ mod test { #[test] fn test_size_demi_sgaseg_t() -> Result<(), anyhow::Error> { // Size of a void pointer. - const SGASEG_BUF_SIZE: usize = 8; + const RESERVED_METADATA_SIZE: usize = 8; + // Size of a void pointer. + const DATA_BUF_PTR_SIZE: usize = 8; // Size of a u32. - const SGASEG_LEN_SIZE: usize = 4; + const DATA_LEN_SIZE: usize = 4; // Size of a demi_sgaseg_t structure. - crate::ensure_eq!(mem::size_of::(), SGASEG_BUF_SIZE + SGASEG_LEN_SIZE); + crate::ensure_eq!( + mem::size_of::(), + RESERVED_METADATA_SIZE + DATA_BUF_PTR_SIZE + DATA_LEN_SIZE + ); Ok(()) } /// Tests if the `demi_sga_t` structure has the expected size. #[test] fn test_size_demi_sgarray_t() -> Result<(), anyhow::Error> { - // Size of a void pointer. - const SGA_BUF_SIZE: usize = 8; // Size of a u32. - const SGA_NUMSEGS_SIZE: usize = 4; + const NUM_SEGMENTS_SIZE: usize = 4; // Size of an array of demi_sgaseg_t structures. - const SGA_SEGS_SIZE: usize = mem::size_of::() * DEMI_SGARRAY_MAXLEN; + const ELEMENTS_SIZE: usize = mem::size_of::() * DEMI_SGARRAY_MAXLEN; // Size of a SockAddr structure. - const SGA_ADDR_SIZE: usize = 16; + const SOCKADDR_SRC_SIZE: usize = 16; // Size of a demi_sgarray_t structure. crate::ensure_eq!( mem::size_of::(), - SGA_BUF_SIZE + SGA_NUMSEGS_SIZE + SGA_SEGS_SIZE + SGA_ADDR_SIZE + NUM_SEGMENTS_SIZE + ELEMENTS_SIZE + SOCKADDR_SRC_SIZE ); Ok(()) } diff --git a/src/runtime/types/ops.rs b/src/runtime/types/ops.rs index 370b98fdc..0ca46a3b0 100644 --- a/src/runtime/types/ops.rs +++ b/src/runtime/types/ops.rs @@ -27,8 +27,8 @@ pub enum demi_opcode_t { #[repr(C, packed)] #[derive(Copy, Clone)] pub struct demi_accept_result_t { - pub qd: i32, - pub addr: libc::sockaddr, + pub qd: i32, // 4 bytes. + pub addr: libc::sockaddr, // 16 bytes. } #[repr(C)] @@ -84,21 +84,23 @@ mod test { /// Tests if `demi_qresult_t` has the expected size. #[test] fn test_size_demi_qresult_t() -> Result<(), anyhow::Error> { - // Size of a demi_opcode_t enum. const QR_OPCODE_SIZE: usize = 4; - // Size of a u32. + crate::ensure_eq!(mem::size_of::(), QR_OPCODE_SIZE); const QR_QD_SIZE: usize = 4; - // Size of a demi_qtoken_t type alias. + crate::ensure_eq!(mem::size_of::(), QR_QD_SIZE); const QR_QT_SIZE: usize = 8; - // Size of a u64. + crate::ensure_eq!(mem::size_of::(), QR_QT_SIZE); const QR_RET_SIZE: usize = 8; - // Size of a demi_qr_value_t structure. + crate::ensure_eq!(mem::size_of::(), QR_RET_SIZE); const QR_VALUE_SIZE: usize = mem::size_of::(); + const QR_RESULT_SIZE: usize = QR_OPCODE_SIZE + QR_QD_SIZE + QR_QT_SIZE + QR_RET_SIZE + QR_VALUE_SIZE; + const PADDING: usize = match QR_RESULT_SIZE % mem::align_of::() { + 0 => 0, + remainder => mem::align_of::() - remainder, + }; + // Size of a demi_qresult_t structure. - crate::ensure_eq!( - mem::size_of::(), - QR_OPCODE_SIZE + QR_QD_SIZE + QR_QT_SIZE + QR_RET_SIZE + QR_VALUE_SIZE - ); + crate::ensure_eq!(mem::size_of::(), QR_RESULT_SIZE + PADDING); Ok(()) } } diff --git a/tests/c/sizes.c b/tests/c/sizes.c index 2af18f85b..6857ba989 100644 --- a/tests/c/sizes.c +++ b/tests/c/sizes.c @@ -45,14 +45,14 @@ *====================================================================================================================*/ // The following sizes are intentionally hardcoded. -#define SGASEG_BUF_SIZE 8 -#define SGASEG_LEN_SIZE 4 -#define DEMI_SGASEG_T_SIZE (SGASEG_BUF_SIZE + SGASEG_LEN_SIZE) -#define SGA_BUF_SIZE 8 +#define SGASEG_MD_SIZE 8 +#define data_buf_ptr_SIZE 8 +#define data_len_bytes_SIZE 4 +#define DEMI_SGASEG_T_SIZE (SGASEG_MD_SIZE + data_buf_ptr_SIZE + data_len_bytes_SIZE) #define SGA_NUMSEGS_SIZE 4 -#define SGA_SEGS_SIZE (DEMI_SGASEG_T_SIZE * DEMI_SGARRAY_MAXSIZE) +#define segments_SIZE (DEMI_SGASEG_T_SIZE * DEMI_SGARRAY_MAXSIZE) #define SGA_ADDR_SIZE 16 -#define DEMI_SGARRAY_T_SIZE (SGA_BUF_SIZE + SGA_NUMSEGS_SIZE + SGA_SEGS_SIZE + SGA_ADDR_SIZE) +#define DEMI_SGARRAY_T_SIZE (SGA_NUMSEGS_SIZE + segments_SIZE + SGA_ADDR_SIZE) #define QD_SIZE 4 #define SADDR_SIZE 16 #define DEMI_ACCEPT_RESULT_T_SIZE (QD_SIZE + SADDR_SIZE) diff --git a/tests/c/syscalls.c b/tests/c/syscalls.c index c0d520c4c..03d91e2ff 100644 --- a/tests/c/syscalls.c +++ b/tests/c/syscalls.c @@ -155,8 +155,8 @@ static bool inval_getsockopt(void) } /* -* @brief Issues an invalid call to getpeername(). -*/ + * @brief Issues an invalid call to getpeername(). + */ static bool inval_getpeername(void) { int qd = -1; @@ -176,7 +176,7 @@ static bool inval_sgaalloc(void) size_t len = 0; demi_sgarray_t sga = demi_sgaalloc(len); - return (sga.sga_buf == NULL); + return (sga.sga_numsegs == 0); } /** @@ -237,12 +237,7 @@ struct test /** * @brief Tests for system calls in demi/libos.h */ -static struct test tests_libos[] = {{inval_socket, "invalid demi_socket()"}, {inval_accept, "invalid demi_accept()"}, - {inval_bind, "invalid demi_bind()"}, {inval_close, "invalid_demi_close()"}, - {inval_connect, "invalid demi_connect()"}, {inval_listen, "invalid demi_listen()"}, - {inval_pop, "invalid demi_pop()"}, {inval_push, "invalid demi_push()"}, - {inval_pushto, "invalid demi_pushto()"}, {inval_getpeername, "invalid demi_getpeername()"}, - {inval_setsockopt, "invalid demi_setsockopt()"}, {inval_getsockopt, "invalid demi_getsockopt()}"}}; +static struct test tests_libos[] = {{inval_socket, "invalid demi_socket()"}, {inval_accept, "invalid demi_accept()"}, {inval_bind, "invalid demi_bind()"}, {inval_close, "invalid_demi_close()"}, {inval_connect, "invalid demi_connect()"}, {inval_listen, "invalid demi_listen()"}, {inval_pop, "invalid demi_pop()"}, {inval_push, "invalid demi_push()"}, {inval_pushto, "invalid demi_pushto()"}, {inval_getpeername, "invalid demi_getpeername()"}, {inval_setsockopt, "invalid demi_setsockopt()"}, {inval_getsockopt, "invalid demi_getsockopt()}"}}; /** * @brief Tests for system calls in demi/sga.h diff --git a/tests/rust/common/libos.rs b/tests/rust/common/libos.rs index 00b48ad28..ce9523dd9 100644 --- a/tests/rust/common/libos.rs +++ b/tests/rust/common/libos.rs @@ -6,6 +6,7 @@ //====================================================================================================================== use crate::common::runtime::SharedDummyRuntime; +use ::arrayvec::ArrayVec; use ::crossbeam_channel::{Receiver, Sender}; use ::demikernel::{ demi_sgarray_t, @@ -15,6 +16,7 @@ use ::demikernel::{ fail::Fail, logging, memory::{into_sgarray, DemiBuffer}, + types::DEMI_SGARRAY_MAXLEN, QDesc, QToken, SharedDemiRuntime, }, OperationResult, @@ -52,7 +54,9 @@ impl DummyLibOS { for a in &mut buf[..] { *a = fill_char; } - let data: demi_sgarray_t = into_sgarray(buf)?; + let mut bufs: ArrayVec = ArrayVec::new(); + bufs.push(buf); + let data: demi_sgarray_t = into_sgarray(bufs)?; Ok(data) } diff --git a/tests/rust/common/runtime.rs b/tests/rust/common/runtime.rs index 630ea2554..adc6f7c48 100644 --- a/tests/rust/common/runtime.rs +++ b/tests/rust/common/runtime.rs @@ -8,7 +8,7 @@ use ::arrayvec::ArrayVec; use ::demikernel::{ inetstack::{ - consts::{MAX_HEADER_SIZE, RECEIVE_BATCH_SIZE}, + consts::{MAX_BATCH_SIZE_NUM_PACKETS, MAX_HEADER_SIZE}, protocols::layer1::PhysicalLayer, }, runtime::{ @@ -60,22 +60,24 @@ impl SharedDummyRuntime { /// Network Runtime Trait Implementation for Dummy Runtime impl PhysicalLayer for SharedDummyRuntime { - fn transmit(&mut self, pkt: DemiBuffer) -> Result<(), Fail> { - trace!("transmitting pkt: size={:?}", pkt.len()); - // The packet header and body must fit into whatever physical media we're transmitting over. - // For this test harness, we 2^16 bytes (u16::MAX) as our limit. - assert!(pkt.len() < u16::MAX as usize); - - match self.outgoing.try_send(pkt) { - Ok(_) => Ok(()), - Err(_) => Err(Fail::new( - libc::EAGAIN, - "Could not push outgoing packet to the shared channel", - )), + fn transmit(&mut self, pkts: ArrayVec) -> Result<(), Fail> { + for pkt in pkts { + trace!("transmitting pkt: size={:?}", pkt.len()); + // The packet header and body must fit into whatever physical media we're transmitting over. + // For this test harness, we 2^16 bytes (u16::MAX) as our limit. + assert!(pkt.len() < u16::MAX as usize); + + if self.outgoing.try_send(pkt).is_err() { + return Err(Fail::new( + libc::EAGAIN, + "Could not push outgoing packet to the shared channel", + )); + } } + Ok(()) } - fn receive(&mut self) -> Result, Fail> { + fn receive(&mut self) -> Result, Fail> { let mut out = ArrayVec::new(); if let Some(buf) = self.incoming.try_recv().ok() { trace!("receiving pkt: size={:?}", buf.len()); diff --git a/tests/rust/tcp.rs b/tests/rust/tcp.rs index ccf997011..c4724b1e6 100644 --- a/tests/rust/tcp.rs +++ b/tests/rust/tcp.rs @@ -10,17 +10,28 @@ mod test { //====================================================================================================================== use crate::common::{libos::*, ALICE_CONFIG_PATH, ALICE_IP, BOB_CONFIG_PATH, BOB_IP, PORT_NUMBER}; use ::anyhow::Result; + use ::arrayvec::ArrayVec; use ::crossbeam_channel::{Receiver, Sender}; use ::demikernel::{ demi_sgarray_t, inetstack::consts::MAX_HEADER_SIZE, runtime::{ memory::{into_sgarray, DemiBuffer}, + types::DEMI_SGARRAY_MAXLEN, OperationResult, QDesc, QToken, }, }; use ::socket2::{Domain, Protocol, Type}; + use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6}, + sync::{Arc, Barrier}, + thread::{self, JoinHandle}, + time::Duration, + }; + #[cfg(target_os = "windows")] + use windows::Win32::Networking::WinSock; + #[cfg(target_os = "windows")] pub const AF_INET: i32 = windows::Win32::Networking::WinSock::AF_INET.0 as i32; @@ -38,15 +49,6 @@ mod test { const SAFE_TIMEOUT: Duration = Duration::from_secs(5); const BAD_WAIT_TIMEOUT_MILLISECONDS: Duration = Duration::from_millis(1); - use std::{ - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6}, - sync::{Arc, Barrier}, - thread::{self, JoinHandle}, - time::Duration, - }; - #[cfg(target_os = "windows")] - use windows::Win32::Networking::WinSock; - //====================================================================================================================== // Open/Close Passive Socket //====================================================================================================================== @@ -986,11 +988,13 @@ mod test { // Push bad data to socket. let zero_bytes: [u8; 0] = []; - let buf: DemiBuffer = match DemiBuffer::from_slice_with_headroom(&zero_bytes, MAX_HEADER_SIZE) { + let buf = match DemiBuffer::from_slice_with_headroom(&zero_bytes, MAX_HEADER_SIZE) { Ok(buf) => buf, Err(e) => anyhow::bail!("(zero-byte) slice should fit in a DemiBuffer: {:?}", e), }; - let data: demi_sgarray_t = into_sgarray(buf)?; + let mut bufs: ArrayVec = ArrayVec::new(); + bufs.push(buf); + let data: demi_sgarray_t = into_sgarray(bufs)?; match libos.push(sockqd, &data) { Ok(_) => diff --git a/tests/rust/udp.rs b/tests/rust/udp.rs index ad1bad5ac..b50c12bf9 100644 --- a/tests/rust/udp.rs +++ b/tests/rust/udp.rs @@ -14,6 +14,7 @@ mod test { use ::crossbeam_channel::{Receiver, Sender}; use ::demikernel::runtime::{ memory::{into_sgarray, DemiBuffer}, + types::DEMI_SGARRAY_MAXLEN, OperationResult, QDesc, QToken, }; @@ -21,8 +22,9 @@ mod test { /// ensure most OS operations will complete. const TIMEOUT_MILLISECONDS: Duration = Duration::from_millis(100); + use ::arrayvec::ArrayVec; use ::socket2::{Domain, Protocol, Type}; - use std::{ + use ::std::{ net::SocketAddr, sync::{Arc, Barrier}, thread::{self, JoinHandle}, @@ -276,7 +278,7 @@ mod test { }, }; let (_, qr): (QDesc, OperationResult) = safe_wait(&mut libos, qt)?; - let bytes: DemiBuffer = match qr { + let bytes: ArrayVec = match qr { OperationResult::Pop(_, bytes) => bytes, _ => { // Close socket on error. @@ -439,7 +441,7 @@ mod test { }, }; let (_, qr): (QDesc, OperationResult) = safe_wait(&mut libos, qt)?; - let bytes: DemiBuffer = match qr { + let bytes: ArrayVec = match qr { OperationResult::Pop(_, bytes) => bytes, _ => { // Close socket on error.