Skip to content

[runtime] Feature: memory segments and batching #1550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions examples/c/tcp-ping-pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ static void pop_wait(int qd, demi_qresult_t *qr)

/* Parse operation result. */
assert(qr->qr_opcode == DEMI_OPC_POP);
assert(qr->qr_value.sga.sga_segs != 0);
assert(qr->qr_value.sga.segments != 0);
}

/*====================================================================================================================*
Expand Down Expand Up @@ -189,7 +189,7 @@ static void server(int argc, char *const argv[], struct sockaddr_in *local, size
/* Extract received scatter-gather array. */
memcpy(&sga, &qr.qr_value.sga, sizeof(demi_sgarray_t));

nbytes += sga.sga_segs[0].sgaseg_len;
nbytes += sga.segments[0].data_len_bytes;

/* Push scatter-gather array. */
push_wait(qd, &sga, &qr);
Expand Down Expand Up @@ -242,10 +242,10 @@ static void client(int argc, char *const argv[], const struct sockaddr_in *remot

/* Allocate scatter-gather array. */
sga = demi_sgaalloc(data_size);
assert(sga.sga_segs != 0);
assert(sga.segments != 0);

/* Prepare data. */
memset(sga.sga_segs[0].sgaseg_buf, 1, data_size);
memset(sga.segments[0].data_buf_ptr, 1, data_size);

/* Push scatter-gather array. */
push_wait(sockqd, &sga, &qr);
Expand All @@ -258,9 +258,9 @@ static void client(int argc, char *const argv[], const struct sockaddr_in *remot
pop_wait(sockqd, &qr);

/* Check payload. */
for (uint32_t i = 0; i < qr.qr_value.sga.sga_segs[0].sgaseg_len; i++)
assert(((char *)qr.qr_value.sga.sga_segs[0].sgaseg_buf)[i] == 1);
nbytes += qr.qr_value.sga.sga_segs[0].sgaseg_len;
for (uint32_t i = 0; i < qr.qr_value.sga.segments[0].data_len_bytes; i++)
assert(((char *)qr.qr_value.sga.segments[0].data_buf_ptr)[i] == 1);
nbytes += qr.qr_value.sga.segments[0].data_len_bytes;

/* Release received scatter-gather array. */
assert(demi_sgafree(&qr.qr_value.sga) == 0);
Expand Down
17 changes: 10 additions & 7 deletions examples/c/tcp-push-pop.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ static int pop_get_received_nbytes(int sockqd)
assert(demi_pop(&tok, sockqd) == 0);
assert(demi_wait(&res, tok, NULL) == 0);
assert(res.qr_opcode == DEMI_OPC_POP);
assert(res.qr_value.sga.sga_segs != 0);
recv_bytes = res.qr_value.sga.sga_segs[0].sgaseg_len;
assert(res.qr_value.sga.segments != 0);
recv_bytes = res.qr_value.sga.segments[0].data_len_bytes;
assert(demi_sgafree(&res.qr_value.sga) == 0);
return recv_bytes;
}
Expand Down Expand Up @@ -91,14 +91,14 @@ static int push_get_sent_nbytes(const int sockqd, size_t data_size)
demi_sgarray_t sga = demi_sgaalloc(data_size);
int sent_bytes = 0;

assert(sga.sga_segs != 0);
memset(sga.sga_segs[0].sgaseg_buf, 1, data_size);
assert(sga.segments != 0);
memset(sga.segments[0].data_buf_ptr, 1, data_size);
// ToDo: demi_pushto() also must work for TCP on all LibOSes.
// FIXME: https://github.com/microsoft/demikernel/issues/137
assert(demi_push(&tok, sockqd, &sga) == 0);
assert(demi_wait(&res, tok, NULL) == 0);
assert(res.qr_opcode == DEMI_OPC_PUSH);
sent_bytes = sga.sga_segs[0].sgaseg_len;
sent_bytes = sga.segments[0].data_len_bytes;
assert(demi_sgafree(&sga) == 0);
return sent_bytes;
}
Expand Down Expand Up @@ -161,9 +161,12 @@ int main(int argc, char *const argv[])
};
assert(demi_init(&args) == 0);

if (!strcmp(argv[1], "--server")) {
if (!strcmp(argv[1], "--server"))
{
run_server(&addr, data_size, max_msgs);
} else if (!strcmp(argv[1], "--client")) {
}
else if (!strcmp(argv[1], "--client"))
{
run_client(&addr, data_size, max_msgs);
}

Expand Down
8 changes: 4 additions & 4 deletions examples/c/udp-ping-pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static void pop_wait(int qd, demi_qresult_t *qr)

/* Parse operation result. */
assert(qr->qr_opcode == DEMI_OPC_POP);
assert(qr->qr_value.sga.sga_segs != 0);
assert(qr->qr_value.sga.segments != 0);
}

/*====================================================================================================================*
Expand Down Expand Up @@ -195,11 +195,11 @@ static void client(int argc,

/* Allocate scatter-gather array. */
sga = demi_sgaalloc(data_size);
assert(sga.sga_segs != 0);
assert(sga.segments != 0);

/* Prepare data. */
memset(expected_buf, it % 256, data_size);
memcpy(sga.sga_segs[0].sgaseg_buf, expected_buf, data_size);
memcpy(sga.segments[0].data_buf_ptr, expected_buf, data_size);

/* Push scatter-gather array. */
pushto_wait(sockqd, &sga, &qr, (const struct sockaddr *)remote);
Expand All @@ -211,7 +211,7 @@ static void client(int argc,
pop_wait(sockqd, &qr);

/* Parse operation result. */
assert(!memcmp(qr.qr_value.sga.sga_segs[0].sgaseg_buf, expected_buf, data_size));
assert(!memcmp(qr.qr_value.sga.segments[0].data_buf_ptr, expected_buf, data_size));

/* Release received scatter-gather array. */
assert(demi_sgafree(&qr.qr_value.sga) == 0);
Expand Down
8 changes: 4 additions & 4 deletions examples/c/udp-push-pop.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ static void server(int argc,

/* Parse operation result. */
assert(qr.qr_opcode == DEMI_OPC_POP);
assert(qr.qr_value.sga.sga_segs != 0);
assert(!memcmp(qr.qr_value.sga.sga_segs[0].sgaseg_buf, expected_buf, data_size));
assert(qr.qr_value.sga.segments != 0);
assert(!memcmp(qr.qr_value.sga.segments[0].data_buf_ptr, expected_buf, data_size));

/* Release scatter-gather array. */
assert(demi_sgafree(&qr.qr_value.sga) == 0);
Expand Down Expand Up @@ -149,10 +149,10 @@ static void client(int argc,

/* Allocate scatter-gather array. */
sga = demi_sgaalloc(data_size);
assert(sga.sga_segs != 0);
assert(sga.segments != 0);

/* Prepare data. */
memset(sga.sga_segs[0].sgaseg_buf, 1, data_size);
memset(sga.segments[0].data_buf_ptr, 1, data_size);

/* Push data. */
assert(demi_pushto(&qt, sockqd, &sga, (const struct sockaddr *)remote, sizeof(struct sockaddr_in)) == 0);
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/tcp-dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Application {
let sockqd: QDesc = qr.qr_qd.into();
let sga: demi_sgarray_t = unsafe { qr.qr_value.sga };

num_bytes += sga.sga_segs[0].sgaseg_len as usize;
num_bytes += sga.segments[0].data_len_bytes as usize;

if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
Expand Down
12 changes: 6 additions & 6 deletions examples/rust/tcp-ping-pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result<demi_sgarray_t> {
};

// Ensure that allocated the array has the requested size.
if sga.sga_segs[0].sgaseg_len as usize != size {
if sga.segments[0].data_len_bytes as usize != size {
freesga(libos, sga);
let seglen: usize = sga.sga_segs[0].sgaseg_len as usize;
let seglen: usize = sga.segments[0].data_len_bytes as usize;
anyhow::bail!(
"failed to allocate scatter-gather array: expected size={:?} allocated size={:?}",
size,
Expand All @@ -50,8 +50,8 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result<demi_sgarray_t> {
}

// Fill in the array.
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = sga.sga_segs[0].sgaseg_len as usize;
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
let mut fill: u8 = value;
for x in slice {
Expand Down Expand Up @@ -124,8 +124,8 @@ fn pop_and_wait(libos: &mut LibOS, sockqd: QDesc, recvbuf: &mut [u8]) -> Result<
};

// Copy data.
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = sga.sga_segs[0].sgaseg_len as usize;
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
for x in slice {
recvbuf[index] = *x;
Expand Down
10 changes: 5 additions & 5 deletions examples/rust/tcp-pktgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Application {
},
};

num_bytes += sga.sga_segs[0].sgaseg_len as usize;
num_bytes += sga.segments[0].data_len_bytes as usize;
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
Expand All @@ -257,12 +257,12 @@ impl Application {
};

// Ensure that allocated array has the requested size.
if sga.sga_segs[0].sgaseg_len as usize != size {
if sga.segments[0].data_len_bytes as usize != size {
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
}
let seglen: usize = sga.sga_segs[0].sgaseg_len as usize;
let seglen: usize = sga.segments[0].data_len_bytes as usize;
anyhow::bail!(
"failed to allocate scatter-gather array: expected size={:?} allocated size={:?}",
size,
Expand All @@ -271,8 +271,8 @@ impl Application {
}

// Fill in the array.
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = sga.sga_segs[0].sgaseg_len as usize;
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
slice.fill(value);

Expand Down
10 changes: 5 additions & 5 deletions examples/rust/tcp-push-pop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ fn mksga(libos: &mut LibOS, value: u8) -> Result<demi_sgarray_t> {
};

// Create pointer for filling the array.
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
// Ensure that allocated array has the requested size.
if sga.sga_segs[0].sgaseg_len as usize != BUF_SIZE_BYTES || ptr.is_null() {
if sga.segments[0].data_len_bytes as usize != BUF_SIZE_BYTES || ptr.is_null() {
freesga(libos, sga);
let seglen: usize = sga.sga_segs[0].sgaseg_len as usize;
let seglen: usize = sga.segments[0].data_len_bytes as usize;
anyhow::bail!(
"failed to allocate scatter-gather array: expected size={:?} allocated size={:?}",
BUF_SIZE_BYTES,
Expand Down Expand Up @@ -119,8 +119,8 @@ impl TcpServer {
};

// Sanity check received data.
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let bytes: usize = sga.sga_segs[0].sgaseg_len as usize;
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let bytes: usize = sga.segments[0].data_len_bytes as usize;
debug_assert_eq!(bytes, BUF_SIZE_BYTES);
debug_assert!(ptr.is_aligned());
debug_assert_eq!(ptr.is_null(), false);
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/udp-dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Application {
match self.libos.wait(qt, None) {
Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_POP => {
let sga: demi_sgarray_t = unsafe { qr.qr_value.sga };
num_bytes += sga.sga_segs[0].sgaseg_len as usize;
num_bytes += sga.segments[0].data_len_bytes as usize;
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
Expand Down
25 changes: 13 additions & 12 deletions examples/rust/udp-echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,19 @@ impl Application {
demi_opcode_t::DEMI_OPC_POP => {
let sockqd: QDesc = qr.qr_qd.into();
let sga: demi_sgarray_t = unsafe { qr.qr_value.sga };
let saddr: SocketAddr = match Self::sockaddr_to_socketaddrv4(&unsafe { qr.qr_value.sga.sga_addr }) {
Ok(saddr) => SocketAddr::from(saddr),
Err(e) => {
// If error, free scatter-gather array.
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
};
anyhow::bail!("could not parse sockaddr: {}", e)
},
};
num_bytes += sga.sga_segs[0].sgaseg_len as usize;
let saddr: SocketAddr =
match Self::sockaddr_to_socketaddrv4(&unsafe { qr.qr_value.sga.sockaddr_src }) {
Ok(saddr) => SocketAddr::from(saddr),
Err(e) => {
// If error, free scatter-gather array.
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
};
anyhow::bail!("could not parse sockaddr: {}", e)
},
};
num_bytes += sga.segments[0].data_len_bytes as usize;
// Push packet back.
let qt: QToken = match self.libos.pushto(sockqd, &sga, saddr) {
Ok(qt) => qt,
Expand Down
16 changes: 8 additions & 8 deletions examples/rust/udp-ping-pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result<demi_sgarray_t> {
};

// Ensure that allocated array has the requested size.
if sga.sga_segs[0].sgaseg_len as usize != size {
if sga.segments[0].data_len_bytes as usize != size {
freesga(libos, sga);
let seglen: usize = sga.sga_segs[0].sgaseg_len as usize;
let seglen: usize = sga.segments[0].data_len_bytes as usize;
anyhow::bail!(
"failed to allocate scatter-gather array: expected size={:?} allocated size={:?}",
size,
seglen
);
}
// Fill in the array.
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = sga.sga_segs[0].sgaseg_len as usize;
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
slice.fill(value);

Expand Down Expand Up @@ -119,8 +119,8 @@ impl UdpServer {
};

// Sanity check received data.
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = sga.sga_segs[0].sgaseg_len as usize;
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
for x in slice {
if *x != fill_char {
Expand Down Expand Up @@ -197,8 +197,8 @@ impl UdpClient {
// Free the sent sga.
self.libos.sgafree(sga)?;
// Sanity check received data.
let ptr: *mut u8 = returned_sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = returned_sga.sga_segs[0].sgaseg_len as usize;
let ptr: *mut u8 = returned_sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = returned_sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
for x in slice {
if *x != fill_char {
Expand Down
10 changes: 5 additions & 5 deletions examples/rust/udp-pktgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl Application {
},
};

num_bytes += sga.sga_segs[0].sgaseg_len as usize;
num_bytes += sga.segments[0].data_len_bytes as usize;

if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
Expand All @@ -266,21 +266,21 @@ impl Application {
};

// Ensure that allocated array has the requested size.
if sga.sga_segs[0].sgaseg_len as usize != size {
if sga.segments[0].data_len_bytes as usize != size {
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
};
let seglen: usize = sga.sga_segs[0].sgaseg_len as usize;
let seglen: usize = sga.segments[0].data_len_bytes as usize;
anyhow::bail!(
"failed to allocate scatter-gather array: expected size={:?} allocated size={:?}",
size,
seglen
);
}
// Fill in the array.
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = sga.sga_segs[0].sgaseg_len as usize;
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
slice.fill(value);

Expand Down
Loading
Loading