Skip to content

Commit

Permalink
deps: move libp2p-server and metrics example to use axum 0.7
Browse files Browse the repository at this point in the history
Move from `hyper` to `axum` for `libp2p-server` and metrics example.
Running in parallel with #5038.

Pull-Request: #5246.
  • Loading branch information
drHuangMHT committed Apr 11, 2024
1 parent 9bb2a87 commit 042174e
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 174 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ release = false

[dependencies]
futures = "0.3.30"
hyper = { version = "0.14", features = ["server", "tcp", "http1"] }
axum = "0.7"
libp2p = { path = "../../libp2p", features = ["tokio", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] }
opentelemetry = { version = "0.22.0", features = ["metrics"] }
opentelemetry-otlp = { version = "0.15.0", features = ["metrics"] }
Expand Down
113 changes: 32 additions & 81 deletions examples/metrics/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,109 +18,60 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use hyper::http::StatusCode;
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, Server};
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::pin::Pin;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::net::TcpListener;

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";

pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> {
// Serve on localhost.
let addr = ([127, 0, 0, 1], 0).into();

let server = Server::bind(&addr).serve(MakeMetricService::new(registry));
tracing::info!(metrics_server=%format!("http://{}/metrics", server.local_addr()));
if let Err(e) = server.await {
tracing::error!("server error: {}", e);
}
let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
let service = MetricService::new(registry);
let server = Router::new()
.route("/metrics", get(respond_with_metrics))
.with_state(service);
let tcp_listener = TcpListener::bind(addr).await?;
let local_addr = tcp_listener.local_addr()?;
tracing::info!(metrics_server=%format!("http://{}/metrics", local_addr));
axum::serve(tcp_listener, server.into_make_service()).await?;
Ok(())
}

#[derive(Clone)]
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
}

type SharedRegistry = Arc<Mutex<Registry>>;

impl MetricService {
fn get_reg(&mut self) -> SharedRegistry {
Arc::clone(&self.reg)
}
fn respond_with_metrics(&mut self) -> Response<String> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE.try_into().unwrap(),
);

let reg = self.get_reg();
encode(&mut response.body_mut(), &reg.lock().unwrap()).unwrap();

*response.status_mut() = StatusCode::OK;

response
}
fn respond_with_404_not_found(&mut self) -> Response<String> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body("Not found try localhost:[port]/metrics".to_string())
.unwrap()
}
}

impl Service<Request<Body>> for MetricService {
type Response = Response<String>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
async fn respond_with_metrics(state: State<MetricService>) -> impl IntoResponse {
let mut sink = String::new();
let reg = state.get_reg();
encode(&mut sink, &reg.lock().unwrap()).unwrap();

fn call(&mut self, req: Request<Body>) -> Self::Future {
let req_path = req.uri().path();
let req_method = req.method();
let resp = if (req_method == Method::GET) && (req_path == "/metrics") {
// Encode and serve metrics from registry.
self.respond_with_metrics()
} else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
}
(
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, METRICS_CONTENT_TYPE)],
sink,
)
}

pub(crate) struct MakeMetricService {
reg: SharedRegistry,
}
type SharedRegistry = Arc<Mutex<Registry>>;

impl MakeMetricService {
pub(crate) fn new(registry: Registry) -> MakeMetricService {
MakeMetricService {
impl MetricService {
fn new(registry: Registry) -> Self {
Self {
reg: Arc::new(Mutex::new(registry)),
}
}
}

impl<T> Service<T> for MakeMetricService {
type Response = MetricService;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: T) -> Self::Future {
let reg = self.reg.clone();
let fut = async move { Ok(MetricService { reg }) };
Box::pin(fut)
fn get_reg(&self) -> SharedRegistry {
Arc::clone(&self.reg)
}
}
2 changes: 1 addition & 1 deletion misc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ base64 = "0.22"
clap = { version = "4.5.4", features = ["derive"] }
futures = "0.3"
futures-timer = "3"
hyper = { version = "0.14", features = ["server", "tcp", "http1"] }
axum = "0.7"
libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] }
prometheus-client = { workspace = true }
serde = "1.0.197"
Expand Down
125 changes: 36 additions & 89 deletions misc/server/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,115 +18,62 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use hyper::http::StatusCode;
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, Server};
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::pin::Pin;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::net::TcpListener;

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
pub(crate) async fn metrics_server(
registry: Registry,
metrics_path: String,
) -> Result<(), hyper::Error> {
) -> Result<(), std::io::Error> {
// Serve on localhost.
let addr = ([0, 0, 0, 0], 8888).into();

let server = Server::bind(&addr).serve(MakeMetricService::new(registry, metrics_path.clone()));
tracing::info!(metrics_server=%format!("http://{}{}", server.local_addr(), metrics_path));
server.await?;
let addr: SocketAddr = ([0, 0, 0, 0], 8888).into();
let service = MetricService::new(registry);
let server = Router::new()
.route(&metrics_path, get(respond_with_metrics))
.with_state(service);
let tcp_listener = TcpListener::bind(addr).await?;
let local_addr = tcp_listener.local_addr()?;
tracing::info!(metrics_server=%format!("http://{}{}", local_addr, metrics_path));
axum::serve(tcp_listener, server.into_make_service()).await?;
Ok(())
}
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
metrics_path: String,
}

type SharedRegistry = Arc<Mutex<Registry>>;
async fn respond_with_metrics(state: State<MetricService>) -> impl IntoResponse {
let mut sink = String::new();
let reg = state.get_reg();
encode(&mut sink, &reg.lock().unwrap()).unwrap();

impl MetricService {
fn get_reg(&mut self) -> SharedRegistry {
Arc::clone(&self.reg)
}
fn respond_with_metrics(&mut self) -> Response<String> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE.try_into().unwrap(),
);

let reg = self.get_reg();
encode(&mut response.body_mut(), &reg.lock().unwrap()).unwrap();

*response.status_mut() = StatusCode::OK;

response
}
fn respond_with_404_not_found(&mut self) -> Response<String> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(format!(
"Not found try localhost:[port]/{}",
self.metrics_path
))
.unwrap()
}
(
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, METRICS_CONTENT_TYPE)],
sink,
)
}

impl Service<Request<Body>> for MetricService {
type Response = Response<String>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
let req_path = req.uri().path();
let req_method = req.method();
let resp = if (req_method == Method::GET) && (req_path == self.metrics_path) {
// Encode and serve metrics from registry.
self.respond_with_metrics()
} else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
}
#[derive(Clone)]
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
}

pub(crate) struct MakeMetricService {
reg: SharedRegistry,
metrics_path: String,
}
type SharedRegistry = Arc<Mutex<Registry>>;

impl MakeMetricService {
pub(crate) fn new(registry: Registry, metrics_path: String) -> MakeMetricService {
MakeMetricService {
reg: Arc::new(Mutex::new(registry)),
metrics_path,
impl MetricService {
fn new(reg: Registry) -> Self {
Self {
reg: Arc::new(Mutex::new(reg)),
}
}
}

impl<T> Service<T> for MakeMetricService {
type Response = MetricService;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: T) -> Self::Future {
let reg = self.reg.clone();
let metrics_path = self.metrics_path.clone();
let fut = async move { Ok(MetricService { reg, metrics_path }) };
Box::pin(fut)
fn get_reg(&self) -> SharedRegistry {
Arc::clone(&self.reg)
}
}

0 comments on commit 042174e

Please sign in to comment.