Skip to content

Commit b45bff7

Browse files
committed
feat: update hyper deps to 1.4
1 parent 11a1be6 commit b45bff7

File tree

12 files changed

+195
-243
lines changed

12 files changed

+195
-243
lines changed

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ members = ["crates/*"]
33
resolver = "2"
44

55
[workspace.package]
6-
version = "0.4.2"
6+
version = "0.5.0"
77
edition = "2021"
88
publish = false
99
authors = ["FastEdge Development Team"]
@@ -15,10 +15,10 @@ tracing = "0.1"
1515
hyper = { version = "1", features = ["full"] }
1616
http = "1.1.0"
1717
async-trait = "0.1"
18-
wasmtime = { version = "22.0" }
19-
wasmtime-wasi = { version = "22.0" }
20-
wasi-common = { version = "22.0" }
21-
wasmtime-wasi-nn = { version = "22.0" }
18+
wasmtime = { version = "20.0.2" }
19+
wasmtime-wasi = { version = "20.0.2" }
20+
wasi-common = { version = "20.0.2" }
21+
wasmtime-wasi-nn = { version = "20.0.2" }
2222
clap = { version = "4", features = ["derive"] }
2323
moka = { version = "0.12", features = ["sync"] }
2424
smol_str = { version = "0.2.1", features = ["serde"] }

crates/candle-wasi-nn/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ edition.workspace = true
55
publish.workspace = true
66
authors.workspace = true
77

8+
[features]
9+
metal = []
10+
cuda = []
11+
812
[dependencies]
913
wasmtime-wasi-nn = { workspace = true }
1014
tracing = { workspace = true }

crates/http-backend/src/lib.rs

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ use std::time::Duration;
77
use anyhow::{anyhow, Error, Result};
88
use async_trait::async_trait;
99
use http::{uri::Scheme, HeaderMap, HeaderValue, Uri};
10-
use http_body_util::Full;
10+
use http_body_util::{BodyExt, Full};
1111
use hyper::body::Bytes;
12+
use hyper::rt::ReadBufCursor;
1213
use hyper_util::client::legacy::connect::{Connect, HttpConnector};
1314
use hyper_util::client::legacy::Client;
1415
use hyper_util::rt::TokioExecutor;
1516
use pin_project::pin_project;
16-
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1717
use tokio::net::TcpStream;
1818
use tower_service::Service;
1919
use tracing::{debug, trace, warn};
@@ -264,9 +264,9 @@ where
264264
let request = self.make_request(req)?;
265265
let res = self.client.request(request).await?;
266266

267-
let _status = res.status().as_u16();
268-
let (parts, _body) = res.into_parts();
269-
let _headers = if !parts.headers.is_empty() {
267+
let status = res.status().as_u16();
268+
let (parts, body) = res.into_parts();
269+
let headers = if !parts.headers.is_empty() {
270270
Some(
271271
parts
272272
.headers
@@ -284,7 +284,7 @@ where
284284
None
285285
};
286286

287-
/*let body_bytes = body.into();
287+
let body_bytes = body.collect().await?.to_bytes();
288288
let body = Some(body_bytes.to_vec());
289289

290290
trace!(?status, ?headers, len = body_bytes.len(), "reply");
@@ -293,8 +293,7 @@ where
293293
status,
294294
headers,
295295
body,
296-
}))*/
297-
unimplemented!("send request")
296+
}))
298297
}
299298
}
300299

@@ -399,41 +398,34 @@ impl Service<Uri> for FastEdgeConnector {
399398
}
400399
}
401400

402-
impl AsyncRead for Connection {
403-
fn poll_read(
404-
self: Pin<&mut Self>,
405-
cx: &mut Context<'_>,
406-
buf: &mut ReadBuf<'_>,
407-
) -> Poll<std::io::Result<()>> {
408-
let this = self.project();
409-
this.inner.poll_read(cx, buf)
401+
impl hyper::rt::Read for Connection {
402+
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: ReadBufCursor<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
403+
let n = unsafe {
404+
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
405+
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
406+
Poll::Ready(Ok(())) => tbuf.filled().len(),
407+
other => return other,
408+
}
409+
};
410+
411+
unsafe {
412+
buf.advance(n);
413+
}
414+
Poll::Ready(Ok(()))
410415
}
411416
}
412417

413-
impl AsyncWrite for Connection {
414-
fn poll_write(
415-
self: Pin<&mut Self>,
416-
cx: &mut Context<'_>,
417-
buf: &[u8],
418-
) -> Poll<std::result::Result<usize, std::io::Error>> {
419-
let this = self.project();
420-
this.inner.poll_write(cx, buf)
418+
impl hyper::rt::Write for Connection {
419+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::result::Result<usize, std::io::Error>> {
420+
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
421421
}
422422

423-
fn poll_flush(
424-
self: Pin<&mut Self>,
425-
cx: &mut Context<'_>,
426-
) -> Poll<std::result::Result<(), std::io::Error>> {
427-
let this = self.project();
428-
this.inner.poll_flush(cx)
423+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
424+
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
429425
}
430426

431-
fn poll_shutdown(
432-
self: Pin<&mut Self>,
433-
cx: &mut Context<'_>,
434-
) -> Poll<std::result::Result<(), std::io::Error>> {
435-
let this = self.project();
436-
this.inner.poll_shutdown(cx)
427+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
428+
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
437429
}
438430
}
439431

crates/http-service/Cargo.toml

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ authors.workspace = true
99
default = []
1010
metrics = ["runtime/metrics"]
1111
stats = ["runtime/stats"]
12-
tls = ["tokio-rustls", "rustls-pemfile", "hyper-rustls", "rustls"]
12+
#tls = ["tokio-rustls", "rustls-pemfile", "hyper-rustls", "rustls"]
1313

1414
[dependencies]
1515
anyhow = { workspace = true }
@@ -23,10 +23,6 @@ wasmtime-wasi-nn = { workspace = true }
2323
wasi-common = { workspace = true }
2424
tracing = { workspace = true }
2525
smol_str = { workspace = true }
26-
tokio-rustls = { version = "0.24.1", optional = true}
27-
rustls-pemfile = { version = "1.0.2" , optional = true}
28-
hyper-rustls = { version = "0.24.1", optional = true }
29-
rustls = { version = "0.21.6", optional = true }
3026
reactor = { path = "../reactor" }
3127
runtime = { path = "../runtime" }
3228
http-backend = { path = "../http-backend" }
@@ -40,9 +36,10 @@ clickhouse = { version = "0.12.0", optional = true }
4036
chrono = "0.4"
4137
async-trait = "0.1"
4238
wasmtime-wasi-http = "20.0.2"
43-
hyper-util = "0.1.3"
44-
http-body-util = "0.1.1"
39+
hyper-util = "0.1"
40+
http-body-util = "0.1"
4541
shellflip = {workspace = true}
42+
bytes = "1.6"
4643

4744
[dev-dependencies]
4845
claims = "0.7"

crates/http-service/src/executor/mod.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ use std::time::{Duration, Instant};
66
use anyhow::{anyhow, bail, Context, Error, Result};
77
use async_trait::async_trait;
88
use bytesize::ByteSize;
9-
use http::{HeaderMap, HeaderValue, Method};
9+
use http::{HeaderMap, HeaderValue, Method, Response};
1010
use http_backend::Backend;
1111
use http_body_util::{BodyExt, Full};
12+
use http_body_util::combinators::BoxBody;
1213
use hyper::body::{Bytes, Incoming};
1314
use smol_str::SmolStr;
1415
use wasmtime_wasi::StdoutStream;
@@ -30,7 +31,7 @@ pub trait HttpExecutor {
3031
async fn execute(
3132
&self,
3233
req: hyper::Request<Incoming>,
33-
) -> Result<(hyper::Response<Full<Bytes>>, Duration, ByteSize)>;
34+
) -> Result<(Response<BoxBody<Bytes, hyper::Error>>, Duration, ByteSize)>;
3435
}
3536

3637
pub trait ExecutorFactory<C> {
@@ -46,7 +47,7 @@ pub trait ExecutorFactory<C> {
4647
/// Execute context used by ['HttpService']
4748
#[derive(Clone)]
4849
pub struct HttpExecutorImpl<C> {
49-
instance_pre: InstancePre<HttpState>,
50+
instance_pre: InstancePre<HttpState<C>>,
5051
store_builder: StoreBuilder,
5152
backend: Backend<C>,
5253
}
@@ -59,7 +60,7 @@ where
5960
async fn execute(
6061
&self,
6162
req: hyper::Request<Incoming>,
62-
) -> Result<(hyper::Response<Full<Bytes>>, Duration, ByteSize)> {
63+
) -> Result<(Response<BoxBody<Bytes, hyper::Error>>, Duration, ByteSize)> {
6364
let start_ = Instant::now();
6465
let response = self.execute_impl(req).await;
6566
let elapsed = Instant::now().duration_since(start_);
@@ -72,7 +73,7 @@ where
7273
C: Clone + Send + Sync + 'static,
7374
{
7475
pub fn new(
75-
instance_pre: InstancePre<HttpState>,
76+
instance_pre: InstancePre<HttpState<C>>,
7677
store_builder: StoreBuilder,
7778
backend: Backend<C>,
7879
) -> Self {
@@ -86,7 +87,7 @@ where
8687
async fn execute_impl(
8788
&self,
8889
req: hyper::Request<Incoming>,
89-
) -> Result<(hyper::Response<Full<Bytes>>, ByteSize)> {
90+
) -> Result<(Response<BoxBody<Bytes, hyper::Error>>, ByteSize)> {
9091
let (parts, body) = req.into_parts();
9192
let method = to_fastedge_http_method(&parts.method)?;
9293

@@ -125,7 +126,7 @@ where
125126
.propagate_headers(&parts.headers)
126127
.context("propagate headers")?;
127128

128-
let state = HttpState { wasi_nn };
129+
let state = HttpState { wasi_nn, http_backend };
129130

130131
let mut store = store_builder.build(state)?;
131132

@@ -158,7 +159,7 @@ where
158159
};
159160
let used = ByteSize::b(store.memory_used() as u64);
160161

161-
let body = resp.body.map(|b| Full::from(b)).unwrap_or_default();
162+
let body = resp.body.map(|b| Full::from(b).map_err(|never| match never {}).boxed()).unwrap_or_default();
162163
builder.body(body).map(|r| (r, used)).map_err(Error::msg)
163164
}
164165

crates/http-service/src/executor/wasi_http.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use std::time::{Duration, Instant};
22

3-
use anyhow::bail;
3+
use anyhow::{bail, Context};
44
use async_trait::async_trait;
55
use bytesize::ByteSize;
6+
use http::Response;
67
use http_body_util::{BodyExt, Full};
8+
use http_body_util::combinators::BoxBody;
79
use hyper::body::{Bytes, Incoming};
810
use tracing::{error, info};
911
use wasmtime_wasi_http::WasiHttpView;
1012

13+
use http_backend::Backend;
1114
use runtime::InstancePre;
1215
use runtime::store::StoreBuilder;
1316

@@ -16,36 +19,44 @@ use crate::HttpState;
1619

1720
/// Execute context used by ['HttpService']
1821
#[derive(Clone)]
19-
pub struct WasiHttpExecutorImpl {
20-
instance_pre: InstancePre<HttpState>,
22+
pub struct WasiHttpExecutorImpl<C> {
23+
instance_pre: InstancePre<HttpState<C>>,
2124
store_builder: StoreBuilder,
25+
backend: Backend<C>,
2226
}
2327

2428
#[async_trait]
25-
impl HttpExecutor for WasiHttpExecutorImpl {
29+
impl<C> HttpExecutor for WasiHttpExecutorImpl<C>
30+
where
31+
C: Clone + Send + Sync + 'static,
32+
{
2633
async fn execute(
2734
&self,
2835
req: hyper::Request<Incoming>,
29-
) -> anyhow::Result<(hyper::Response<Full<Bytes>>, Duration, ByteSize)> {
36+
) -> anyhow::Result<(Response<BoxBody<Bytes, hyper::Error>>, Duration, ByteSize)> {
3037
let start_ = Instant::now();
3138
let response = self.execute_impl(req).await;
3239
let elapsed = Instant::now().duration_since(start_);
3340
response.map(|(r, used)| (r, elapsed, used))
3441
}
3542
}
3643

37-
impl WasiHttpExecutorImpl {
38-
pub fn new(instance_pre: InstancePre<HttpState>, store_builder: StoreBuilder) -> Self {
44+
impl<C> WasiHttpExecutorImpl<C>
45+
where
46+
C: Clone + Send + Sync + 'static,
47+
{
48+
pub fn new(instance_pre: InstancePre<HttpState<C>>, store_builder: StoreBuilder, backend: Backend<C>) -> Self {
3949
Self {
4050
instance_pre,
4151
store_builder,
52+
backend,
4253
}
4354
}
4455

4556
async fn execute_impl(
4657
&self,
4758
req: hyper::Request<Incoming>,
48-
) -> anyhow::Result<(hyper::Response<Full<Bytes>>, ByteSize)> {
59+
) -> anyhow::Result<(Response<BoxBody<Bytes, hyper::Error>>, ByteSize)> {
4960
let (sender, receiver) = tokio::sync::oneshot::channel();
5061
let (parts, body) = req.into_parts();
5162

@@ -58,8 +69,13 @@ impl WasiHttpExecutorImpl {
5869
.store_builder
5970
.make_wasi_nn_ctx()
6071
.expect("make_wasi_nn_ctx");
72+
let mut http_backend = self.backend.to_owned();
6173

62-
let state = HttpState { wasi_nn };
74+
http_backend
75+
.propagate_headers(&parts.headers)
76+
.context("propagate headers")?;
77+
78+
let state = HttpState { wasi_nn, http_backend };
6379

6480
let mut store = store_builder.build(state).expect("store build");
6581
let instance_pre = self.instance_pre.clone();
@@ -106,9 +122,9 @@ impl WasiHttpExecutorImpl {
106122
Ok(Ok(resp)) => {
107123
let (parts, body) = resp.into_parts();
108124
let body = body.collect().await.expect("incoming body").to_bytes();
109-
let body = Full::new(body);
125+
let body = Full::new(body).map_err(|never| match never {}).boxed();
110126
let used = task.await.expect("task await").expect("byte size");
111-
Ok((hyper::Response::from_parts(parts, body), used))
127+
Ok((Response::from_parts(parts, body), used))
112128
}
113129
Ok(Err(e)) => Err(e.into()),
114130
Err(_) => {

0 commit comments

Comments
 (0)