Skip to content

Commit 0108153

Browse files
authored
Merge pull request #1609 from microsoft/ab-inetstack-layer1-cleanup
catnap: cleanup type annotations and reduce syntactic clutter, better names
2 parents c98508c + 3a1c761 commit 0108153

File tree

10 files changed

+253
-269
lines changed

10 files changed

+253
-269
lines changed

src/catnap/linux/active_socket.rs

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
runtime::{fail::Fail, limits, memory::DemiBuffer, DemiRuntime},
1313
};
1414
use ::socket2::Socket;
15-
use ::std::{cmp::min, io, mem::MaybeUninit, net::SocketAddr};
15+
use ::std::{cmp::min, mem::MaybeUninit, net::SocketAddr, slice};
1616

1717
//======================================================================================================================
1818
// Structures
@@ -21,7 +21,7 @@ use ::std::{cmp::min, io, mem::MaybeUninit, net::SocketAddr};
2121
/// This structure represents outgoing packets.
2222
struct Outgoing {
2323
addr: Option<SocketAddr>,
24-
buf: DemiBuffer,
24+
buffer: DemiBuffer,
2525
result: SharedAsyncValue<Option<Result<(), Fail>>>,
2626
}
2727

@@ -54,43 +54,43 @@ impl ActiveSocketData {
5454
pub fn poll_send(&mut self) {
5555
if let Some(Outgoing {
5656
addr,
57-
mut buf,
57+
mut buffer,
5858
mut result,
5959
}) = self.send_queue.try_pop()
6060
{
6161
// A dummy request to detect when the socket has connected.
62-
if buf.is_empty() {
62+
if buffer.is_empty() {
6363
result.set(Some(Ok(())));
6464
return;
6565
}
6666
// Try to send the buffer.
67-
let io_result: Result<usize, io::Error> = match addr {
68-
Some(addr) => self.socket.send_to(&buf, &addr.into()),
69-
None => self.socket.send(&buf),
67+
let io_result = match addr {
68+
Some(addr) => self.socket.send_to(&buffer, &addr.into()),
69+
None => self.socket.send(&buffer),
7070
};
7171
match io_result {
7272
// Operation completed.
7373
Ok(nbytes) => {
74-
trace!("data pushed ({:?}/{:?} bytes)", nbytes, buf.len());
74+
trace!("data pushed ({:?}/{:?} bytes)", nbytes, buffer.len());
7575
expect_ok!(
76-
buf.adjust(nbytes),
76+
buffer.adjust(nbytes),
7777
"OS should not have sent more bytes than in the buffer"
7878
);
79-
if buf.is_empty() {
79+
if buffer.is_empty() {
8080
// Done sending this buffer
8181
result.set(Some(Ok(())));
8282
} else {
8383
// Only sent part of the buffer so try again later.
84-
self.send_queue.push_front(Outgoing { addr, buf, result });
84+
self.send_queue.push_front(Outgoing { addr, buffer, result });
8585
}
8686
},
8787
Err(e) => {
88-
let errno: i32 = get_libc_err(e);
88+
let errno = get_libc_err(e);
8989
if DemiRuntime::should_retry(errno) {
9090
// Put the buffer back and try again later.
91-
self.send_queue.push_front(Outgoing { addr, buf, result });
91+
self.send_queue.push_front(Outgoing { addr, buffer, result });
9292
} else {
93-
let cause: String = format!("failed to send on socket: {:?}", errno);
93+
let cause = format!("failed to send on socket: {:?}", errno);
9494
error!("poll_send(): {}", cause);
9595
result.set(Some(Err(Fail::new(errno, &cause))));
9696
}
@@ -103,30 +103,30 @@ impl ActiveSocketData {
103103
/// queue.
104104
/// TODO: Incoming queue should possibly be byte oriented.
105105
pub fn poll_recv(&mut self) {
106-
let mut buf: DemiBuffer = DemiBuffer::new(limits::POP_SIZE_MAX as u16);
106+
let mut buffer = DemiBuffer::new(limits::POP_SIZE_MAX as u16);
107107
if self.closed {
108108
return;
109109
}
110110
match self
111111
.socket
112-
.recv_from(unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr() as *mut MaybeUninit<u8>, buf.len()) })
112+
.recv_from(unsafe { slice::from_raw_parts_mut(buffer.as_mut_ptr() as *mut MaybeUninit<u8>, buffer.len()) })
113113
{
114114
// Operation completed.
115-
Ok((nbytes, socketaddr)) => {
116-
if let Err(e) = buf.trim(buf.len() - nbytes) {
115+
Ok((num_bytes, socketaddr)) => {
116+
if let Err(e) = buffer.trim(buffer.len() - num_bytes) {
117117
self.recv_queue.push(Err(e));
118118
} else {
119-
trace!("data popped ({:?} bytes)", nbytes);
120-
if buf.is_empty() {
119+
trace!("data popped ({:?} bytes)", num_bytes);
120+
if buffer.is_empty() {
121121
self.closed = true;
122122
}
123-
self.recv_queue.push(Ok((socketaddr.as_socket(), buf)));
123+
self.recv_queue.push(Ok((socketaddr.as_socket(), buffer)));
124124
}
125125
},
126126
Err(e) => {
127-
let errno: i32 = get_libc_err(e);
127+
let errno = get_libc_err(e);
128128
if !DemiRuntime::should_retry(errno) {
129-
let cause: String = format!("failed to receive on socket: {:?}", errno);
129+
let cause = format!("failed to receive on socket: {:?}", errno);
130130
error!("poll_recv(): {}", cause);
131131
self.recv_queue.push(Err(Fail::new(errno, &cause)));
132132
}
@@ -135,11 +135,11 @@ impl ActiveSocketData {
135135
}
136136

137137
/// Pushes data to the socket. Blocks until completion.
138-
pub async fn push(&mut self, addr: Option<SocketAddr>, buf: DemiBuffer) -> Result<(), Fail> {
139-
let mut result: SharedAsyncValue<Option<Result<(), Fail>>> = SharedAsyncValue::new(None);
138+
pub async fn push(&mut self, addr: Option<SocketAddr>, buffer: DemiBuffer) -> Result<(), Fail> {
139+
let mut result = SharedAsyncValue::new(None);
140140
self.send_queue.push(Outgoing {
141141
addr,
142-
buf,
142+
buffer,
143143
result: result.clone(),
144144
});
145145
loop {
@@ -153,19 +153,18 @@ impl ActiveSocketData {
153153
}
154154
}
155155

156-
/// Pops data from the socket. Blocks until some data is found but does not wait until the buf has reached [size].
156+
/// Pops data from the socket. Blocks until some data is found but does not wait until the buffer has reached [size].
157157
pub async fn pop(&mut self, size: usize) -> Result<(Option<SocketAddr>, DemiBuffer), Fail> {
158-
let (addr, mut incoming): (Option<SocketAddr>, DemiBuffer) = self.recv_queue.pop(None).await??;
158+
let (addr, mut buffer) = self.recv_queue.pop(None).await??;
159159
// Figure out how much data we got.
160-
let bytes_read: usize = min(incoming.len(), size);
160+
let bytes_read = min(buffer.len(), size);
161161
// Trim the buffer and leave for next read if we got more than expected.
162-
if let Ok(remainder) = incoming.split_back(bytes_read) {
162+
if let Ok(remainder) = buffer.split_back(bytes_read) {
163163
if !remainder.is_empty() {
164164
self.recv_queue.push_front(Ok((addr, remainder)));
165165
}
166166
}
167-
168-
Ok((addr, incoming))
167+
Ok((addr, buffer))
169168
}
170169

171170
pub fn get_socket(&self) -> &Socket {

src/catnap/linux/passive_socket.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ impl PassiveSocketData {
4343
// Operation completed.
4444
Ok((new_socket, saddr)) => {
4545
trace!("connection accepted ({:?})", new_socket);
46-
let addr: SocketAddr = expect_some!(saddr.as_socket(), "not a SocketAddrV4");
46+
let addr = expect_some!(saddr.as_socket(), "not a SocketAddrV4");
4747
self.accept_queue.push(Ok((new_socket, addr)))
4848
},
4949
Err(e) => {
5050
// Check the return error code.
51-
let errno: i32 = get_libc_err(e);
51+
let errno = get_libc_err(e);
5252
if !DemiRuntime::should_retry(errno) {
53-
let cause: String = format!("failed to accept on socket: {:?}", errno);
53+
let cause = format!("failed to accept on socket: {:?}", errno);
5454
error!("poll_accept(): {}", cause);
5555
self.accept_queue.push(Err(Fail::new(errno, &cause)));
5656
}

src/catnap/linux/socket.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl SharedSocketData {
5151

5252
/// Moves an inactive socket to a passive listening socket.
5353
pub fn move_socket_to_passive(&mut self) {
54-
let socket: Socket = match self.deref_mut() {
54+
let socket = match self.deref_mut() {
5555
SocketData::Inactive(socket) => expect_some!(socket.take(), "should have data"),
5656
SocketData::Active(_) => unreachable!("should not be able to move an active socket to a passive one"),
5757
SocketData::Passive(_) => return,
@@ -61,7 +61,7 @@ impl SharedSocketData {
6161

6262
/// Moves an inactive socket to an active established socket.
6363
pub fn move_socket_to_active(&mut self) {
64-
let socket: Socket = match self.deref_mut() {
64+
let socket = match self.deref_mut() {
6565
SocketData::Inactive(socket) => expect_some!(socket.take(), "should have data"),
6666
SocketData::Active(_) => return,
6767
SocketData::Passive(_) => unreachable!("should not be able to move a passive socket to an active one"),
@@ -71,7 +71,7 @@ impl SharedSocketData {
7171

7272
/// Gets a reference to the actual Socket for reading the socket's metadata (mostly the raw file descriptor).
7373
pub fn get_socket<'a>(&'a self) -> &'a Socket {
74-
let _self: &'a SocketData = self.as_ref();
74+
let _self = self.as_ref();
7575
match _self {
7676
SocketData::Inactive(Some(socket)) => socket,
7777
SocketData::Active(data) => data.get_socket(),
@@ -82,7 +82,7 @@ impl SharedSocketData {
8282

8383
/// Gets a mutable reference to the actual Socket for I/O operations.
8484
pub fn get_mut_socket<'a>(&'a mut self) -> &'a mut Socket {
85-
let _self: &'a mut SocketData = self.as_mut();
85+
let _self = self.as_mut();
8686
match _self {
8787
SocketData::Inactive(Some(socket)) => socket,
8888
SocketData::Active(data) => data.get_mut_socket(),
@@ -97,10 +97,10 @@ impl SharedSocketData {
9797
}
9898

9999
/// Push some data to an active established connection.
100-
pub async fn push(&mut self, addr: Option<SocketAddr>, buf: DemiBuffer) -> Result<(), Fail> {
100+
pub async fn push(&mut self, addr: Option<SocketAddr>, buffer: DemiBuffer) -> Result<(), Fail> {
101101
match self.deref_mut() {
102102
SocketData::Inactive(_) => unreachable!("Cannot write to an inactive socket"),
103-
SocketData::Active(data) => data.push(addr, buf).await,
103+
SocketData::Active(data) => data.push(addr, buffer).await,
104104
SocketData::Passive(_) => unreachable!("Cannot write to a passive socket"),
105105
}
106106
}

0 commit comments

Comments
 (0)