Skip to content

Commit

Permalink
Switch to Hyper 1
Browse files Browse the repository at this point in the history
  • Loading branch information
nox committed Nov 22, 2024
1 parent e1d1a5a commit be7257c
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 68 deletions.
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ darling = "0.20.10"
erased-serde = "0.3.28"
futures-util = "0.3.28"
governor = "0.6"
hyper = { version = "0.14", default-features = false }
http-body-util = "0.1"
hyper = { version = "1", default-features = false }
hyper-util = { version = "0.1", default-features = false }
indexmap = "2.0.0"
ipnetwork = "0.20"
once_cell = "1.5"
tonic = { version = "0.11.0", default-features = false }
opentelemetry-proto = "0.5.0"
tonic = { version = "0.12", default-features = false }
opentelemetry-proto = "0.7"
parking_lot = "0.12.1"
proc-macro2 = { version = "1", default-features = false }
prometheus = { version = "0.13.3", default-features = false }
Expand All @@ -68,6 +70,7 @@ tokio = "1.41.0"
thread_local = "1.1"
tikv-jemallocator = "0.5"
tikv-jemalloc-ctl = "0.5"
tower-service = "0.3"
yaml-merge-keys = "0.5"

# needed for minver
Expand Down
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ publish = false
anyhow = { workspace = true }
foundations = { workspace = true }
futures-util = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true, features = ["server", "tokio"] }
tokio = { workspace = true, features = ["full"]}

[[example]]
Expand Down
15 changes: 10 additions & 5 deletions examples/http_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use foundations::settings::collections::Map;
use foundations::telemetry::{self, log, tracing, TelemetryConfig, TelemetryContext};
use foundations::BootstrapResult;
use futures_util::stream::{FuturesUnordered, StreamExt};
use hyper::server::conn::Http;
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use hyper::{Request, Response};
use hyper_util::rt::{TokioExecutor, TokioIo};
use std::convert::Infallible;
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::sync::Arc;
Expand Down Expand Up @@ -193,7 +195,10 @@ async fn serve_connection(
}
});

if let Err(e) = Http::new().serve_connection(conn, on_request).await {
if let Err(e) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new())
.serve_connection(TokioIo::new(conn), on_request)
.await
{
log::error!("failed to serve HTTP"; "error" => ?e);
metrics::http_server::failed_connections_total(&endpoint_name).inc();
}
Expand All @@ -204,9 +209,9 @@ async fn serve_connection(
#[tracing::span_fn("respond to request")]
async fn respond(
endpoint_name: Arc<String>,
req: Request<Body>,
req: Request<Incoming>,
routes: Arc<Map<String, ResponseSettings>>,
) -> Result<Response<Body>, Infallible> {
) -> Result<Response<Full<Bytes>>, Infallible> {
log::add_fields! {
"request_uri" => req.uri().to_string(),
"method" => req.method().to_string()
Expand Down
14 changes: 7 additions & 7 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ client-telemetry = ["logging", "metrics", "tracing", "dep:futures-util"]

# Enables the telemetry server.
telemetry-server = [
"dep:http-body-util",
"dep:hyper",
"dep:hyper-util",
"dep:socket2",
"dep:percent-encoding"
"dep:percent-encoding",
]

# Enables telemetry reporting over gRPC
telemetry-otlp-grpc = ["dep:tonic", "dep:tokio", "dep:hyper"]
telemetry-otlp-grpc = ["dep:tonic", "tonic/prost", "dep:tokio", "dep:hyper"]

# Enables experimental tokio runtime metrics
tokio-runtime-metrics = [
Expand Down Expand Up @@ -177,11 +179,9 @@ clap = { workspace = true, optional = true }
erased-serde = { workspace = true, optional = true }
futures-util = { workspace = true, optional = true }
governor = { workspace = true, optional = true }
hyper = { workspace = true, optional = true, features = [
"http1",
"runtime",
"server",
] }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true, features = ["http1", "server"] }
hyper-util = { workspace = true, optional = true, features = ["tokio"] }
indexmap = { workspace = true, optional = true, features = ["serde"] }
once_cell = { workspace = true, optional = true }
opentelemetry-proto = { workspace = true, optional = true, features = ["gen-tonic-messages", "trace"] }
Expand Down
12 changes: 5 additions & 7 deletions foundations/src/telemetry/driver.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::utils::feature_use;
use crate::BootstrapResult;
use futures_util::future::BoxFuture;
use futures_util::stream::{FuturesUnordered, Stream};
use futures_util::FutureExt;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, Stream};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

feature_use!(cfg(feature = "telemetry-server"), {
use super::server::TelemetryServerFuture;
use anyhow::anyhow;
use hyper::Server;
use std::net::SocketAddr;
});

Expand Down Expand Up @@ -38,7 +36,7 @@ impl TelemetryDriver {
) -> Self {
Self {
#[cfg(feature = "telemetry-server")]
server_addr: server_fut.as_ref().map(Server::local_addr),
server_addr: server_fut.as_ref().map(|fut| fut.local_addr()),

#[cfg(feature = "telemetry-server")]
server_fut,
Expand Down Expand Up @@ -67,7 +65,7 @@ impl TelemetryDriver {
{
if let Some(server_fut) = self.server_fut.take() {
self.tele_futures.push(
async move { Ok(server_fut.with_graceful_shutdown(signal).await?) }.boxed(),
async move { Ok(server_fut.with_graceful_shutdown(signal).await) }.boxed(),
);

return;
Expand All @@ -93,7 +91,7 @@ impl Future for TelemetryDriver {
#[cfg(feature = "telemetry-server")]
if let Some(server_fut) = &mut self.server_fut {
if let Poll::Ready(res) = Pin::new(server_fut).poll(cx) {
ready_res.push(res.map_err(|err| anyhow!(err)));
match res {}
}
}

Expand Down
Loading

0 comments on commit be7257c

Please sign in to comment.