Skip to content

Commit 16acbd6

Browse files
committed
chore: Use a fork of hyper with Body::poll_progress
hyperium/http-body#90 proposes adding a `Body::poll_progress` method to the `Body` trait. This PR uses a fork of hyper that uses this proposed API when awaiting stream send capacity. This supports implementing timeouts on streams and connections in an unhealthy state to defend servers against resource exhaustion.
1 parent c34cf74 commit 16acbd6

File tree

16 files changed

+99
-5
lines changed

16 files changed

+99
-5
lines changed

.gitmodules

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
[submodule "http-body"]
2+
path = http-body
3+
url = [email protected]:olix0r/http-body.git
4+
[submodule "hyper"]
5+
path = hyper
6+
url = [email protected]:olix0r/hyper.git

Cargo.lock

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -824,8 +824,6 @@ dependencies = [
824824
[[package]]
825825
name = "http-body"
826826
version = "0.4.6"
827-
source = "registry+https://github.com/rust-lang/crates.io-index"
828-
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
829827
dependencies = [
830828
"bytes",
831829
"http",
@@ -853,8 +851,6 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
853851
[[package]]
854852
name = "hyper"
855853
version = "0.14.28"
856-
source = "registry+https://github.com/rust-lang/crates.io-index"
857-
checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80"
858854
dependencies = [
859855
"bytes",
860856
"futures-channel",

Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,15 @@ members = [
7979
"tools",
8080
]
8181

82+
exclude = [
83+
"http-body",
84+
"hyper",
85+
]
86+
8287
[profile.release]
8388
debug = 1
8489
lto = true
90+
91+
[patch.crates-io]
92+
http-body = { path = "http-body" }
93+
hyper = { path = "hyper" }

http-body

Submodule http-body added at 30db493

hyper

Submodule hyper added at 542b48d

hyper-balance/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ where
116116
Poll::Ready(ret)
117117
}
118118

119+
#[inline]
120+
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121+
self.project().body.poll_progress(cx)
122+
}
123+
119124
fn poll_trailers(
120125
self: Pin<&mut Self>,
121126
cx: &mut Context<'_>,

linkerd/app/core/src/errors/respond.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,13 @@ where
469469
}
470470
}
471471

472+
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
473+
match self.project() {
474+
ResponseBodyProj::Passthru(inner) => inner.poll_progress(cx),
475+
ResponseBodyProj::GrpcRescue { inner, .. } => inner.poll_progress(cx),
476+
}
477+
}
478+
472479
#[inline]
473480
fn poll_trailers(
474481
self: Pin<&mut Self>,

linkerd/http/box/src/body.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ impl Body for BoxBody {
5858
self.as_mut().inner.as_mut().poll_data(cx)
5959
}
6060

61+
#[inline]
62+
fn poll_progress(
63+
mut self: Pin<&mut Self>,
64+
cx: &mut Context<'_>,
65+
) -> Poll<Result<(), Self::Error>> {
66+
self.as_mut().inner.as_mut().poll_progress(cx)
67+
}
68+
6169
#[inline]
6270
fn poll_trailers(
6371
mut self: Pin<&mut Self>,
@@ -116,12 +124,17 @@ where
116124
}))
117125
}
118126

127+
#[inline]
128+
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129+
self.project().0.poll_progress(cx).map_err(Into::into)
130+
}
131+
119132
#[inline]
120133
fn poll_trailers(
121134
self: Pin<&mut Self>,
122135
cx: &mut Context<'_>,
123136
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
124-
Poll::Ready(futures::ready!(self.project().0.poll_trailers(cx)).map_err(Into::into))
137+
self.project().0.poll_trailers(cx).map_err(Into::into)
125138
}
126139

127140
#[inline]

linkerd/http/metrics/src/requests/service.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,12 @@ where
283283
Poll::Ready(frame)
284284
}
285285

286+
#[inline]
287+
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
288+
self.project().inner.poll_progress(cx)
289+
}
290+
291+
#[inline]
286292
fn poll_trailers(
287293
self: Pin<&mut Self>,
288294
cx: &mut Context<'_>,
@@ -422,6 +428,11 @@ where
422428
Poll::Ready(frame)
423429
}
424430

431+
#[inline]
432+
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
433+
self.project().inner.poll_progress(cx).map_err(Into::into)
434+
}
435+
425436
fn poll_trailers(
426437
mut self: Pin<&mut Self>,
427438
cx: &mut Context<'_>,

linkerd/http/retry/src/replay.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,21 @@ where
251251
Poll::Ready(Some(Ok(Data::Initial(chunk))))
252252
}
253253

254+
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
255+
let this = self.get_mut();
256+
let state = Self::acquire_state(&mut this.state, &this.shared.body);
257+
tracing::trace!("ReplayBody::poll_progress");
258+
259+
if let Some(rest) = state.rest.as_mut() {
260+
// If the inner body has previously ended, don't poll it again.
261+
if !rest.is_end_stream() {
262+
return Pin::new(rest).poll_progress(cx).map_err(Into::into);
263+
}
264+
}
265+
266+
Poll::Ready(Ok(()))
267+
}
268+
254269
fn poll_trailers(
255270
self: Pin<&mut Self>,
256271
cx: &mut Context<'_>,

0 commit comments

Comments
 (0)