From ed94b46b1edc9eff872b73a23ff40f1748f10ca9 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 11 Mar 2024 19:34:05 +0800 Subject: [PATCH 01/12] dep: bump hyper tp 1.1.0 --- Cargo.lock | 7 +- examples/metrics/Cargo.toml | 17 ++++- examples/metrics/src/http_service.rs | 95 +++++++++++++----------- misc/server/Cargo.toml | 21 +++++- misc/server/src/http_service.rs | 103 +++++++++++++++------------ 5 files changed, 148 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69d0963f1dc..f85b28fb612 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3188,7 +3188,8 @@ dependencies = [ "clap", "futures", "futures-timer", - "hyper 0.14.27", + "hyper 1.1.0", + "hyper-util", "libp2p", "prometheus-client", "serde", @@ -3662,13 +3663,15 @@ name = "metrics-example" version = "0.1.0" dependencies = [ "futures", - "hyper 0.14.27", + "hyper 1.1.0", + "hyper-util", "libp2p", "opentelemetry", "opentelemetry-otlp", "opentelemetry_api", "prometheus-client", "tokio", + "tokio-stream", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/examples/metrics/Cargo.toml b/examples/metrics/Cargo.toml index 39412d29aea..4ea4d016e1d 100644 --- a/examples/metrics/Cargo.toml +++ b/examples/metrics/Cargo.toml @@ -10,13 +10,24 @@ release = false [dependencies] futures = "0.3.30" -hyper = { version = "0.14", features = ["server", "tcp", "http1"] } -libp2p = { path = "../../libp2p", features = ["tokio", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] } +hyper = { version = "1.1.0", features = ["server", "http1"] } +libp2p = { path = "../../libp2p", features = [ + "tokio", + "metrics", + "ping", + "noise", + "identify", + "tcp", + "yamux", + "macros", +] } opentelemetry = { version = "0.20.0", features = ["rt-tokio", "metrics"] } -opentelemetry-otlp = { version = "0.13.0", features = ["metrics"]} +opentelemetry-otlp = { version = "0.13.0", features = ["metrics"] } opentelemetry_api = "0.20.0" prometheus-client = { workspace = true } tokio = { version = "1", features = ["full"] } +tokio-stream = { version = "0.1", features = ["net"] } +hyper-util = {version = "0.1"} tracing = "0.1.37" tracing-opentelemetry = "0.21.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index 8c77d724ea3..27bb581a777 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -18,24 +18,31 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use hyper::body::Incoming; use hyper::http::StatusCode; use hyper::service::Service; -use hyper::{Body, Method, Request, Response, Server}; +use hyper::{server::conn::http1::Builder, Method, Request, Response}; use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; +use tokio::net::TcpListener; const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0"; pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> { // Serve on localhost. - let addr = ([127, 0, 0, 1], 0).into(); - - let server = Server::bind(&addr).serve(MakeMetricService::new(registry)); - tracing::info!(metrics_server=%format!("http://{}/metrics", server.local_addr())); + let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); + let tcp_listener_stream = TcpListener::bind(addr).await?; + let local_addr = tcp_listener_stream.local_addr()?; + let (connection, _) = tcp_listener_stream.accept().await?; + let server = Builder::new().serve_connection( + hyper_util::rt::TokioIo::new(connection), + MetricService::new(registry), + ); + tracing::info!(metrics_server=%format!("http://{}/metrics", local_addr)); if let Err(e) = server.await { tracing::error!("server error: {}", e); } @@ -49,10 +56,16 @@ pub(crate) struct MetricService { type SharedRegistry = Arc>; impl MetricService { - fn get_reg(&mut self) -> SharedRegistry { + fn new(registry: Registry) -> Self { + Self { + reg: Arc::new(Mutex::new(registry)), + } + } + + fn get_reg(&self) -> SharedRegistry { Arc::clone(&self.reg) } - fn respond_with_metrics(&mut self) -> Response { + fn respond_with_metrics(&self) -> Response { let mut response: Response = Response::default(); response.headers_mut().insert( @@ -67,7 +80,7 @@ impl MetricService { response } - fn respond_with_404_not_found(&mut self) -> Response { + fn respond_with_404_not_found(&self) -> Response { Response::builder() .status(StatusCode::NOT_FOUND) .body("Not found try localhost:[port]/metrics".to_string()) @@ -75,16 +88,16 @@ impl MetricService { } } -impl Service> for MetricService { +impl Service> for MetricService { type Response = Response; type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + // fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + // Poll::Ready(Ok(())) + // } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { let req_path = req.uri().path(); let req_method = req.method(); let resp = if (req_method == Method::GET) && (req_path == "/metrics") { @@ -97,30 +110,30 @@ impl Service> for MetricService { } } -pub(crate) struct MakeMetricService { - reg: SharedRegistry, -} - -impl MakeMetricService { - pub(crate) fn new(registry: Registry) -> MakeMetricService { - MakeMetricService { - reg: Arc::new(Mutex::new(registry)), - } - } -} - -impl Service for MakeMetricService { - type Response = MetricService; - type Error = hyper::Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: T) -> Self::Future { - let reg = self.reg.clone(); - let fut = async move { Ok(MetricService { reg }) }; - Box::pin(fut) - } -} +// pub(crate) struct MakeMetricService { +// reg: SharedRegistry, +// } + +// impl MakeMetricService { +// pub(crate) fn new(registry: Registry) -> MakeMetricService { +// MakeMetricService { +// reg: Arc::new(Mutex::new(registry)), +// } +// } +// } + +// impl Service for MakeMetricService { +// type Response = MetricService; +// type Error = hyper::Error; +// type Future = Pin> + Send>>; + +// fn poll_ready(&mut self, _: &mut Context) -> Poll> { +// Poll::Ready(Ok(())) +// } + +// fn call(&self, _: T) -> Self::Future { +// let reg = self.reg.clone(); +// let fut = async move { Ok(MetricService { reg }) }; +// Box::pin(fut) +// } +// } diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml index 3f46de701e2..30cf07a7bbb 100644 --- a/misc/server/Cargo.toml +++ b/misc/server/Cargo.toml @@ -15,13 +15,30 @@ base64 = "0.21" clap = { version = "4.4.16", features = ["derive"] } futures = "0.3" futures-timer = "3" -hyper = { version = "0.14", features = ["server", "tcp", "http1"] } -libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] } +hyper = { version = "1.1.0", features = ["server", "http1"] } +libp2p = { workspace = true, features = [ + "autonat", + "dns", + "tokio", + "noise", + "tcp", + "yamux", + "identify", + "kad", + "ping", + "relay", + "metrics", + "rsa", + "macros", + "quic", + "websocket", +] } prometheus-client = { workspace = true } serde = "1.0.197" serde_derive = "1.0.125" serde_json = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +hyper-util = "0.1" tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = ["env-filter"] } zeroize = "1" diff --git a/misc/server/src/http_service.rs b/misc/server/src/http_service.rs index 7905933fbf5..1ee25f94db0 100644 --- a/misc/server/src/http_service.rs +++ b/misc/server/src/http_service.rs @@ -18,27 +18,37 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use hyper::body::Incoming; use hyper::http::StatusCode; +use hyper::server::conn::http1::Builder; use hyper::service::Service; -use hyper::{Body, Method, Request, Response, Server}; +use hyper::{Method, Request, Response}; use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; +use tokio::net::TcpListener; const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0"; pub(crate) async fn metrics_server( registry: Registry, metrics_path: String, -) -> Result<(), hyper::Error> { +) -> Result<(), std::io::Error> { // Serve on localhost. - let addr = ([0, 0, 0, 0], 8888).into(); - - let server = Server::bind(&addr).serve(MakeMetricService::new(registry, metrics_path.clone())); - tracing::info!(metrics_server=%format!("http://{}{}", server.local_addr(), metrics_path)); - server.await?; + let addr: SocketAddr = ([0, 0, 0, 0], 8888).into(); + let tcp_listener_stream = TcpListener::bind(addr).await?; + let local_addr = tcp_listener_stream.local_addr()?; + let (connection, _) = tcp_listener_stream.accept().await?; + let server = Builder::new().serve_connection( + hyper_util::rt::TokioIo::new(connection), + MetricService::new(registry, metrics_path.clone()), + ); + tracing::info!(metrics_server=%format!("http://{}{}", local_addr, metrics_path)); + if let Err(e) = server.await { + tracing::error!("server error: {}", e); + } Ok(()) } pub(crate) struct MetricService { @@ -49,10 +59,17 @@ pub(crate) struct MetricService { type SharedRegistry = Arc>; impl MetricService { - fn get_reg(&mut self) -> SharedRegistry { + fn new(reg: Registry, metrics_path: String) -> Self { + Self { + reg: Arc::new(Mutex::new(reg)), + metrics_path, + } + } + + fn get_reg(&self) -> SharedRegistry { Arc::clone(&self.reg) } - fn respond_with_metrics(&mut self) -> Response { + fn respond_with_metrics(&self) -> Response { let mut response: Response = Response::default(); response.headers_mut().insert( @@ -67,7 +84,7 @@ impl MetricService { response } - fn respond_with_404_not_found(&mut self) -> Response { + fn respond_with_404_not_found(&self) -> Response { Response::builder() .status(StatusCode::NOT_FOUND) .body(format!( @@ -78,16 +95,12 @@ impl MetricService { } } -impl Service> for MetricService { +impl Service> for MetricService { type Response = Response; type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { let req_path = req.uri().path(); let req_method = req.method(); let resp = if (req_method == Method::GET) && (req_path == self.metrics_path) { @@ -100,33 +113,29 @@ impl Service> for MetricService { } } -pub(crate) struct MakeMetricService { - reg: SharedRegistry, - metrics_path: String, -} - -impl MakeMetricService { - pub(crate) fn new(registry: Registry, metrics_path: String) -> MakeMetricService { - MakeMetricService { - reg: Arc::new(Mutex::new(registry)), - metrics_path, - } - } -} - -impl Service for MakeMetricService { - type Response = MetricService; - type Error = hyper::Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: T) -> Self::Future { - let reg = self.reg.clone(); - let metrics_path = self.metrics_path.clone(); - let fut = async move { Ok(MetricService { reg, metrics_path }) }; - Box::pin(fut) - } -} +// pub(crate) struct MakeMetricService { +// reg: SharedRegistry, +// metrics_path: String, +// } + +// impl MakeMetricService { +// pub(crate) fn new(registry: Registry, metrics_path: String) -> MakeMetricService { +// MakeMetricService { +// reg: Arc::new(Mutex::new(registry)), +// metrics_path, +// } +// } +// } + +// impl Service for MakeMetricService { +// type Response = MetricService; +// type Error = hyper::Error; +// type Future = Pin> + Send>>; + +// fn call(&self, _: T) -> Self::Future { +// let reg = self.reg.clone(); +// let metrics_path = self.metrics_path.clone(); +// let fut = async move { Ok(MetricService { reg, metrics_path }) }; +// Box::pin(fut) +// } +// } From 41bf966c541507e89d30c0e24ced458c135fd2ce Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 19 Mar 2024 10:40:55 +0800 Subject: [PATCH 02/12] allow accept multiple clients --- examples/metrics/src/http_service.rs | 22 ++++++++++++++-------- misc/server/src/http_service.rs | 21 +++++++++++++-------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index 27bb581a777..4579ac05cf2 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -37,18 +37,24 @@ pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Er let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); let tcp_listener_stream = TcpListener::bind(addr).await?; let local_addr = tcp_listener_stream.local_addr()?; - let (connection, _) = tcp_listener_stream.accept().await?; - let server = Builder::new().serve_connection( - hyper_util::rt::TokioIo::new(connection), - MetricService::new(registry), - ); tracing::info!(metrics_server=%format!("http://{}/metrics", local_addr)); - if let Err(e) = server.await { - tracing::error!("server error: {}", e); + let service = MetricService::new(registry); + loop { + let service = service.clone(); + let (connection, _) = tcp_listener_stream.accept().await?; + tokio::spawn(async move { + let server = Builder::new().serve_connection( + hyper_util::rt::TokioIo::new(connection), + service, + ); + if let Err(e) = server.await { + tracing::error!("server error: {}", e); + } + }); } - Ok(()) } +#[derive(Clone)] pub(crate) struct MetricService { reg: Arc>, } diff --git a/misc/server/src/http_service.rs b/misc/server/src/http_service.rs index 1ee25f94db0..8a57c573bd1 100644 --- a/misc/server/src/http_service.rs +++ b/misc/server/src/http_service.rs @@ -40,17 +40,22 @@ pub(crate) async fn metrics_server( let addr: SocketAddr = ([0, 0, 0, 0], 8888).into(); let tcp_listener_stream = TcpListener::bind(addr).await?; let local_addr = tcp_listener_stream.local_addr()?; - let (connection, _) = tcp_listener_stream.accept().await?; - let server = Builder::new().serve_connection( - hyper_util::rt::TokioIo::new(connection), - MetricService::new(registry, metrics_path.clone()), - ); + let service = MetricService::new(registry, metrics_path.clone()); tracing::info!(metrics_server=%format!("http://{}{}", local_addr, metrics_path)); - if let Err(e) = server.await { - tracing::error!("server error: {}", e); + loop { + let service = service.clone(); + let (connection, _) = tcp_listener_stream.accept().await?; + tokio::spawn(async move { + let server = + Builder::new().serve_connection(hyper_util::rt::TokioIo::new(connection), service); + if let Err(e) = server.await { + tracing::error!("server error: {}", e); + } + }); } - Ok(()) } + +#[derive(Clone)] pub(crate) struct MetricService { reg: Arc>, metrics_path: String, From 579e98875e00c1d066cbb6d4678f9d26cafd7f10 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 19 Mar 2024 11:23:38 +0800 Subject: [PATCH 03/12] remove unnecessary tokio-stream, revoke formatting, set proper feature flag for hyper-util --- Cargo.lock | 1 - examples/metrics/Cargo.toml | 14 ++------------ misc/server/Cargo.toml | 18 +----------------- 3 files changed, 3 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f85b28fb612..b38c130ff68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3671,7 +3671,6 @@ dependencies = [ "opentelemetry_api", "prometheus-client", "tokio", - "tokio-stream", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/examples/metrics/Cargo.toml b/examples/metrics/Cargo.toml index 4ea4d016e1d..c9ba6a8fd10 100644 --- a/examples/metrics/Cargo.toml +++ b/examples/metrics/Cargo.toml @@ -11,23 +11,13 @@ release = false [dependencies] futures = "0.3.30" hyper = { version = "1.1.0", features = ["server", "http1"] } -libp2p = { path = "../../libp2p", features = [ - "tokio", - "metrics", - "ping", - "noise", - "identify", - "tcp", - "yamux", - "macros", -] } +libp2p = { path = "../../libp2p", features = ["tokio", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] } opentelemetry = { version = "0.20.0", features = ["rt-tokio", "metrics"] } opentelemetry-otlp = { version = "0.13.0", features = ["metrics"] } opentelemetry_api = "0.20.0" prometheus-client = { workspace = true } tokio = { version = "1", features = ["full"] } -tokio-stream = { version = "0.1", features = ["net"] } -hyper-util = {version = "0.1"} +hyper-util = {version = "0.1", features = ["tokio"]} tracing = "0.1.37" tracing-opentelemetry = "0.21.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml index 30cf07a7bbb..b6b7e184a48 100644 --- a/misc/server/Cargo.toml +++ b/misc/server/Cargo.toml @@ -16,23 +16,7 @@ clap = { version = "4.4.16", features = ["derive"] } futures = "0.3" futures-timer = "3" hyper = { version = "1.1.0", features = ["server", "http1"] } -libp2p = { workspace = true, features = [ - "autonat", - "dns", - "tokio", - "noise", - "tcp", - "yamux", - "identify", - "kad", - "ping", - "relay", - "metrics", - "rsa", - "macros", - "quic", - "websocket", -] } +libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] } prometheus-client = { workspace = true } serde = "1.0.197" serde_derive = "1.0.125" From 8765b54e77e9db25674fd5053b74c66dac8f521f Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 19 Mar 2024 11:28:45 +0800 Subject: [PATCH 04/12] formatting --- examples/metrics/src/http_service.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index 4579ac05cf2..a96877471eb 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -43,10 +43,8 @@ pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Er let service = service.clone(); let (connection, _) = tcp_listener_stream.accept().await?; tokio::spawn(async move { - let server = Builder::new().serve_connection( - hyper_util::rt::TokioIo::new(connection), - service, - ); + let server = + Builder::new().serve_connection(hyper_util::rt::TokioIo::new(connection), service); if let Err(e) = server.await { tracing::error!("server error: {}", e); } From 9ce9e56d0f9fc07357c86eed5146df25c69738ff Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 19 Mar 2024 11:38:55 +0800 Subject: [PATCH 05/12] set proper feature flag for hyper-util --- misc/server/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml index b6b7e184a48..47c6f29290d 100644 --- a/misc/server/Cargo.toml +++ b/misc/server/Cargo.toml @@ -22,7 +22,7 @@ serde = "1.0.197" serde_derive = "1.0.125" serde_json = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } -hyper-util = "0.1" +hyper-util = { version = "0.1", features = ["tokio"]} tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = ["env-filter"] } zeroize = "1" From 582af4b6617d4d2a1a3f874119ab7b449fcd2cf8 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 1 Apr 2024 15:41:26 +0800 Subject: [PATCH 06/12] moving to axum --- Cargo.lock | 6 +- examples/metrics/Cargo.toml | 3 +- examples/metrics/src/http_service.rs | 94 +++++++--------------------- misc/server/Cargo.toml | 3 +- misc/server/src/http_service.rs | 90 +++++++------------------- 5 files changed, 50 insertions(+), 146 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b38c130ff68..0f554163234 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3184,12 +3184,11 @@ dependencies = [ name = "libp2p-server" version = "0.12.7" dependencies = [ + "axum 0.7.4", "base64 0.21.7", "clap", "futures", "futures-timer", - "hyper 1.1.0", - "hyper-util", "libp2p", "prometheus-client", "serde", @@ -3662,9 +3661,8 @@ dependencies = [ name = "metrics-example" version = "0.1.0" dependencies = [ + "axum 0.7.4", "futures", - "hyper 1.1.0", - "hyper-util", "libp2p", "opentelemetry", "opentelemetry-otlp", diff --git a/examples/metrics/Cargo.toml b/examples/metrics/Cargo.toml index c9ba6a8fd10..1021bcfeab1 100644 --- a/examples/metrics/Cargo.toml +++ b/examples/metrics/Cargo.toml @@ -10,14 +10,13 @@ release = false [dependencies] futures = "0.3.30" -hyper = { version = "1.1.0", features = ["server", "http1"] } +axum = "0.7" libp2p = { path = "../../libp2p", features = ["tokio", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] } opentelemetry = { version = "0.20.0", features = ["rt-tokio", "metrics"] } opentelemetry-otlp = { version = "0.13.0", features = ["metrics"] } opentelemetry_api = "0.20.0" prometheus-client = { workspace = true } tokio = { version = "1", features = ["full"] } -hyper-util = {version = "0.1", features = ["tokio"]} tracing = "0.1.37" tracing-opentelemetry = "0.21.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index a96877471eb..cf77e86c3cc 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -18,38 +18,30 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use hyper::body::Incoming; -use hyper::http::StatusCode; -use hyper::service::Service; -use hyper::{server::conn::http1::Builder, Method, Request, Response}; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::Response; +use axum::routing::get; +use axum::Router; use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; -use std::future::Future; use std::net::SocketAddr; -use std::pin::Pin; use std::sync::{Arc, Mutex}; -use tokio::net::TcpListener; const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0"; pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> { // Serve on localhost. let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); - let tcp_listener_stream = TcpListener::bind(addr).await?; - let local_addr = tcp_listener_stream.local_addr()?; + let server = Router::new() + .route("/metrics", get(respond_with_metrics)) + .fallback(respond_with_404_not_found) + .with_state(MetricService::new(registry)); + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + let local_addr = listener.local_addr()?; tracing::info!(metrics_server=%format!("http://{}/metrics", local_addr)); - let service = MetricService::new(registry); - loop { - let service = service.clone(); - let (connection, _) = tcp_listener_stream.accept().await?; - tokio::spawn(async move { - let server = - Builder::new().serve_connection(hyper_util::rt::TokioIo::new(connection), service); - if let Err(e) = server.await { - tracing::error!("server error: {}", e); - } - }); - } + axum::serve(listener, server.into_make_service()).await.unwrap(); + Ok(()) } #[derive(Clone)] @@ -57,6 +49,14 @@ pub(crate) struct MetricService { reg: Arc>, } +async fn respond_with_metrics(state: State) -> Response { + state.respond_with_metrics() +} + +async fn respond_with_404_not_found(state: State) -> Response { + state.respond_with_404_not_found() +} + type SharedRegistry = Arc>; impl MetricService { @@ -73,7 +73,7 @@ impl MetricService { let mut response: Response = Response::default(); response.headers_mut().insert( - hyper::header::CONTENT_TYPE, + axum::http::header::CONTENT_TYPE, METRICS_CONTENT_TYPE.try_into().unwrap(), ); @@ -91,53 +91,3 @@ impl MetricService { .unwrap() } } - -impl Service> for MetricService { - type Response = Response; - type Error = hyper::Error; - type Future = Pin> + Send>>; - - // fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - // Poll::Ready(Ok(())) - // } - - fn call(&self, req: Request) -> Self::Future { - let req_path = req.uri().path(); - let req_method = req.method(); - let resp = if (req_method == Method::GET) && (req_path == "/metrics") { - // Encode and serve metrics from registry. - self.respond_with_metrics() - } else { - self.respond_with_404_not_found() - }; - Box::pin(async { Ok(resp) }) - } -} - -// pub(crate) struct MakeMetricService { -// reg: SharedRegistry, -// } - -// impl MakeMetricService { -// pub(crate) fn new(registry: Registry) -> MakeMetricService { -// MakeMetricService { -// reg: Arc::new(Mutex::new(registry)), -// } -// } -// } - -// impl Service for MakeMetricService { -// type Response = MetricService; -// type Error = hyper::Error; -// type Future = Pin> + Send>>; - -// fn poll_ready(&mut self, _: &mut Context) -> Poll> { -// Poll::Ready(Ok(())) -// } - -// fn call(&self, _: T) -> Self::Future { -// let reg = self.reg.clone(); -// let fut = async move { Ok(MetricService { reg }) }; -// Box::pin(fut) -// } -// } diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml index 47c6f29290d..e58620b9d80 100644 --- a/misc/server/Cargo.toml +++ b/misc/server/Cargo.toml @@ -15,14 +15,13 @@ base64 = "0.21" clap = { version = "4.4.16", features = ["derive"] } futures = "0.3" futures-timer = "3" -hyper = { version = "1.1.0", features = ["server", "http1"] } +axum = "0.7" libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] } prometheus-client = { workspace = true } serde = "1.0.197" serde_derive = "1.0.125" serde_json = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } -hyper-util = { version = "0.1", features = ["tokio"]} tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = ["env-filter"] } zeroize = "1" diff --git a/misc/server/src/http_service.rs b/misc/server/src/http_service.rs index 8a57c573bd1..3fec605b78c 100644 --- a/misc/server/src/http_service.rs +++ b/misc/server/src/http_service.rs @@ -18,16 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use hyper::body::Incoming; -use hyper::http::StatusCode; -use hyper::server::conn::http1::Builder; -use hyper::service::Service; -use hyper::{Method, Request, Response}; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::Response; +use axum::routing::get; +use axum::Router; use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; -use std::future::Future; use std::net::SocketAddr; -use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; @@ -38,21 +36,26 @@ pub(crate) async fn metrics_server( ) -> Result<(), std::io::Error> { // Serve on localhost. let addr: SocketAddr = ([0, 0, 0, 0], 8888).into(); - let tcp_listener_stream = TcpListener::bind(addr).await?; - let local_addr = tcp_listener_stream.local_addr()?; let service = MetricService::new(registry, metrics_path.clone()); + let server = Router::new() + .route(&metrics_path, get(respond_with_metrics)) + .fallback(respond_with_404_not_found) + .with_state(service); + let tcp_listener = TcpListener::bind(addr).await?; + let local_addr = tcp_listener.local_addr()?; tracing::info!(metrics_server=%format!("http://{}{}", local_addr, metrics_path)); - loop { - let service = service.clone(); - let (connection, _) = tcp_listener_stream.accept().await?; - tokio::spawn(async move { - let server = - Builder::new().serve_connection(hyper_util::rt::TokioIo::new(connection), service); - if let Err(e) = server.await { - tracing::error!("server error: {}", e); - } - }); - } + axum::serve(tcp_listener, server.into_make_service()) + .await + .unwrap(); + Ok(()) +} + +async fn respond_with_metrics(state: State) -> Response { + state.respond_with_metrics() +} + +async fn respond_with_404_not_found(state: State) -> Response { + state.respond_with_404_not_found() } #[derive(Clone)] @@ -78,7 +81,7 @@ impl MetricService { let mut response: Response = Response::default(); response.headers_mut().insert( - hyper::header::CONTENT_TYPE, + axum::http::header::CONTENT_TYPE, METRICS_CONTENT_TYPE.try_into().unwrap(), ); @@ -99,48 +102,3 @@ impl MetricService { .unwrap() } } - -impl Service> for MetricService { - type Response = Response; - type Error = hyper::Error; - type Future = Pin> + Send>>; - - fn call(&self, req: Request) -> Self::Future { - let req_path = req.uri().path(); - let req_method = req.method(); - let resp = if (req_method == Method::GET) && (req_path == self.metrics_path) { - // Encode and serve metrics from registry. - self.respond_with_metrics() - } else { - self.respond_with_404_not_found() - }; - Box::pin(async { Ok(resp) }) - } -} - -// pub(crate) struct MakeMetricService { -// reg: SharedRegistry, -// metrics_path: String, -// } - -// impl MakeMetricService { -// pub(crate) fn new(registry: Registry, metrics_path: String) -> MakeMetricService { -// MakeMetricService { -// reg: Arc::new(Mutex::new(registry)), -// metrics_path, -// } -// } -// } - -// impl Service for MakeMetricService { -// type Response = MetricService; -// type Error = hyper::Error; -// type Future = Pin> + Send>>; - -// fn call(&self, _: T) -> Self::Future { -// let reg = self.reg.clone(); -// let metrics_path = self.metrics_path.clone(); -// let fut = async move { Ok(MetricService { reg, metrics_path }) }; -// Box::pin(fut) -// } -// } From 3557633344946f5fcabe928149f124d592db394c Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 1 Apr 2024 15:56:30 +0800 Subject: [PATCH 07/12] trigger PR From a3692815c6cd7d65d45539ee69652786d770d276 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 1 Apr 2024 16:10:33 +0800 Subject: [PATCH 08/12] formatting and style consistency --- examples/metrics/src/http_service.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index cf77e86c3cc..c0e9e160408 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -27,20 +27,24 @@ use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; +use tokio::net::TcpListener; const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0"; pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> { // Serve on localhost. let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); + let service = MetricService::new(registry); let server = Router::new() .route("/metrics", get(respond_with_metrics)) .fallback(respond_with_404_not_found) - .with_state(MetricService::new(registry)); - let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - let local_addr = listener.local_addr()?; + .with_state(service); + let tcp_listener = TcpListener::bind(addr).await.unwrap(); + let local_addr = tcp_listener.local_addr()?; tracing::info!(metrics_server=%format!("http://{}/metrics", local_addr)); - axum::serve(listener, server.into_make_service()).await.unwrap(); + axum::serve(tcp_listener, server.into_make_service()) + .await + .unwrap(); Ok(()) } From f9171fdb00bea91d8d8275202621e938cdc7e02d Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 9 Apr 2024 11:02:58 +0800 Subject: [PATCH 09/12] return instead of unwrap, simplify with tuple response --- examples/metrics/src/http_service.rs | 48 ++++++++++----------------- misc/server/src/http_service.rs | 49 ++++++++++------------------ 2 files changed, 35 insertions(+), 62 deletions(-) diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index c0e9e160408..2db8850e3a3 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -20,7 +20,7 @@ use axum::extract::State; use axum::http::StatusCode; -use axum::response::Response; +use axum::response::IntoResponse; use axum::routing::get; use axum::Router; use prometheus_client::encoding::text::encode; @@ -39,12 +39,10 @@ pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Er .route("/metrics", get(respond_with_metrics)) .fallback(respond_with_404_not_found) .with_state(service); - let tcp_listener = TcpListener::bind(addr).await.unwrap(); + let tcp_listener = TcpListener::bind(addr).await?; let local_addr = tcp_listener.local_addr()?; tracing::info!(metrics_server=%format!("http://{}/metrics", local_addr)); - axum::serve(tcp_listener, server.into_make_service()) - .await - .unwrap(); + axum::serve(tcp_listener, server.into_make_service()).await?; Ok(()) } @@ -53,12 +51,23 @@ pub(crate) struct MetricService { reg: Arc>, } -async fn respond_with_metrics(state: State) -> Response { - state.respond_with_metrics() +async fn respond_with_metrics(state: State) -> impl IntoResponse { + let mut sink = String::new(); + let reg = state.get_reg(); + encode(&mut sink, ®.lock().unwrap()).unwrap(); + + ( + StatusCode::OK, + [(axum::http::header::CONTENT_TYPE, METRICS_CONTENT_TYPE)], + sink, + ) } -async fn respond_with_404_not_found(state: State) -> Response { - state.respond_with_404_not_found() +async fn respond_with_404_not_found() -> impl IntoResponse { + ( + StatusCode::NOT_FOUND, + "Not found try localhost:[port]/metrics", + ) } type SharedRegistry = Arc>; @@ -73,25 +82,4 @@ impl MetricService { fn get_reg(&self) -> SharedRegistry { Arc::clone(&self.reg) } - fn respond_with_metrics(&self) -> Response { - let mut response: Response = Response::default(); - - response.headers_mut().insert( - axum::http::header::CONTENT_TYPE, - METRICS_CONTENT_TYPE.try_into().unwrap(), - ); - - let reg = self.get_reg(); - encode(&mut response.body_mut(), ®.lock().unwrap()).unwrap(); - - *response.status_mut() = StatusCode::OK; - - response - } - fn respond_with_404_not_found(&self) -> Response { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body("Not found try localhost:[port]/metrics".to_string()) - .unwrap() - } } diff --git a/misc/server/src/http_service.rs b/misc/server/src/http_service.rs index 3fec605b78c..eb3a2fab6ad 100644 --- a/misc/server/src/http_service.rs +++ b/misc/server/src/http_service.rs @@ -20,7 +20,7 @@ use axum::extract::State; use axum::http::StatusCode; -use axum::response::Response; +use axum::response::IntoResponse; use axum::routing::get; use axum::Router; use prometheus_client::encoding::text::encode; @@ -44,18 +44,27 @@ pub(crate) async fn metrics_server( let tcp_listener = TcpListener::bind(addr).await?; let local_addr = tcp_listener.local_addr()?; tracing::info!(metrics_server=%format!("http://{}{}", local_addr, metrics_path)); - axum::serve(tcp_listener, server.into_make_service()) - .await - .unwrap(); + axum::serve(tcp_listener, server.into_make_service()).await?; Ok(()) } -async fn respond_with_metrics(state: State) -> Response { - state.respond_with_metrics() +async fn respond_with_metrics(state: State) -> impl IntoResponse { + let mut sink = String::new(); + let reg = state.get_reg(); + encode(&mut sink, ®.lock().unwrap()).unwrap(); + + ( + StatusCode::OK, + [(axum::http::header::CONTENT_TYPE, METRICS_CONTENT_TYPE)], + sink, + ) } -async fn respond_with_404_not_found(state: State) -> Response { - state.respond_with_404_not_found() +async fn respond_with_404_not_found(state: State) -> impl IntoResponse { + ( + StatusCode::NOT_FOUND, + format!("Not found try localhost:[port]/{}", state.metrics_path), + ) } #[derive(Clone)] @@ -77,28 +86,4 @@ impl MetricService { fn get_reg(&self) -> SharedRegistry { Arc::clone(&self.reg) } - fn respond_with_metrics(&self) -> Response { - let mut response: Response = Response::default(); - - response.headers_mut().insert( - axum::http::header::CONTENT_TYPE, - METRICS_CONTENT_TYPE.try_into().unwrap(), - ); - - let reg = self.get_reg(); - encode(&mut response.body_mut(), ®.lock().unwrap()).unwrap(); - - *response.status_mut() = StatusCode::OK; - - response - } - fn respond_with_404_not_found(&self) -> Response { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(format!( - "Not found try localhost:[port]/{}", - self.metrics_path - )) - .unwrap() - } } From 778b1f7ab82320646470a487882ce53207deee51 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Thu, 11 Apr 2024 18:53:44 +0800 Subject: [PATCH 10/12] remove 404 fallback --- examples/metrics/src/http_service.rs | 8 -------- misc/server/src/http_service.rs | 8 -------- 2 files changed, 16 deletions(-) diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index 2db8850e3a3..4a9c9785bb3 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -37,7 +37,6 @@ pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Er let service = MetricService::new(registry); let server = Router::new() .route("/metrics", get(respond_with_metrics)) - .fallback(respond_with_404_not_found) .with_state(service); let tcp_listener = TcpListener::bind(addr).await?; let local_addr = tcp_listener.local_addr()?; @@ -63,13 +62,6 @@ async fn respond_with_metrics(state: State) -> impl IntoResponse ) } -async fn respond_with_404_not_found() -> impl IntoResponse { - ( - StatusCode::NOT_FOUND, - "Not found try localhost:[port]/metrics", - ) -} - type SharedRegistry = Arc>; impl MetricService { diff --git a/misc/server/src/http_service.rs b/misc/server/src/http_service.rs index eb3a2fab6ad..8a5bc1d3e97 100644 --- a/misc/server/src/http_service.rs +++ b/misc/server/src/http_service.rs @@ -39,7 +39,6 @@ pub(crate) async fn metrics_server( let service = MetricService::new(registry, metrics_path.clone()); let server = Router::new() .route(&metrics_path, get(respond_with_metrics)) - .fallback(respond_with_404_not_found) .with_state(service); let tcp_listener = TcpListener::bind(addr).await?; let local_addr = tcp_listener.local_addr()?; @@ -60,13 +59,6 @@ async fn respond_with_metrics(state: State) -> impl IntoResponse ) } -async fn respond_with_404_not_found(state: State) -> impl IntoResponse { - ( - StatusCode::NOT_FOUND, - format!("Not found try localhost:[port]/{}", state.metrics_path), - ) -} - #[derive(Clone)] pub(crate) struct MetricService { reg: Arc>, From b50eb1b0c3bec33de857c26d2b1bd5e8837695d1 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Thu, 11 Apr 2024 18:57:35 +0800 Subject: [PATCH 11/12] remove unused field metrics_path --- misc/server/src/http_service.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/misc/server/src/http_service.rs b/misc/server/src/http_service.rs index 8a5bc1d3e97..cee1aa96e28 100644 --- a/misc/server/src/http_service.rs +++ b/misc/server/src/http_service.rs @@ -36,7 +36,7 @@ pub(crate) async fn metrics_server( ) -> Result<(), std::io::Error> { // Serve on localhost. let addr: SocketAddr = ([0, 0, 0, 0], 8888).into(); - let service = MetricService::new(registry, metrics_path.clone()); + let service = MetricService::new(registry); let server = Router::new() .route(&metrics_path, get(respond_with_metrics)) .with_state(service); @@ -62,16 +62,14 @@ async fn respond_with_metrics(state: State) -> impl IntoResponse #[derive(Clone)] pub(crate) struct MetricService { reg: Arc>, - metrics_path: String, } type SharedRegistry = Arc>; impl MetricService { - fn new(reg: Registry, metrics_path: String) -> Self { + fn new(reg: Registry) -> Self { Self { reg: Arc::new(Mutex::new(reg)), - metrics_path, } } From 2acb45f3764bf12d32cc07ea25863d967212080f Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Thu, 11 Apr 2024 23:50:57 +0800 Subject: [PATCH 12/12] trigger CI