Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: move libp2p-server and metrics example to use axum 0.7 #5246

Merged
merged 14 commits into from
Apr 11, 2024
Merged
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ release = false

[dependencies]
futures = "0.3.30"
hyper = { version = "0.14", features = ["server", "tcp", "http1"] }
axum = "0.7"
libp2p = { path = "../../libp2p", features = ["tokio", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] }
opentelemetry = { version = "0.22.0", features = ["metrics"] }
opentelemetry-otlp = { version = "0.15.0", features = ["metrics"] }
Expand Down
113 changes: 32 additions & 81 deletions examples/metrics/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,109 +18,60 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use hyper::http::StatusCode;
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, Server};
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::pin::Pin;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::net::TcpListener;

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";

pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> {
// Serve on localhost.
let addr = ([127, 0, 0, 1], 0).into();

let server = Server::bind(&addr).serve(MakeMetricService::new(registry));
tracing::info!(metrics_server=%format!("http://{}/metrics", server.local_addr()));
if let Err(e) = server.await {
tracing::error!("server error: {}", e);
}
let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
drHuangMHT marked this conversation as resolved.
Show resolved Hide resolved
let service = MetricService::new(registry);
let server = Router::new()
.route("/metrics", get(respond_with_metrics))
.with_state(service);
let tcp_listener = TcpListener::bind(addr).await?;
let local_addr = tcp_listener.local_addr()?;
tracing::info!(metrics_server=%format!("http://{}/metrics", local_addr));
axum::serve(tcp_listener, server.into_make_service()).await?;
Ok(())
}

#[derive(Clone)]
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
}

type SharedRegistry = Arc<Mutex<Registry>>;

impl MetricService {
fn get_reg(&mut self) -> SharedRegistry {
Arc::clone(&self.reg)
}
fn respond_with_metrics(&mut self) -> Response<String> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE.try_into().unwrap(),
);

let reg = self.get_reg();
encode(&mut response.body_mut(), &reg.lock().unwrap()).unwrap();

*response.status_mut() = StatusCode::OK;

response
}
fn respond_with_404_not_found(&mut self) -> Response<String> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body("Not found try localhost:[port]/metrics".to_string())
.unwrap()
}
}

impl Service<Request<Body>> for MetricService {
type Response = Response<String>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
async fn respond_with_metrics(state: State<MetricService>) -> impl IntoResponse {
let mut sink = String::new();
let reg = state.get_reg();
encode(&mut sink, &reg.lock().unwrap()).unwrap();

fn call(&mut self, req: Request<Body>) -> Self::Future {
let req_path = req.uri().path();
let req_method = req.method();
let resp = if (req_method == Method::GET) && (req_path == "/metrics") {
// Encode and serve metrics from registry.
self.respond_with_metrics()
} else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
}
(
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, METRICS_CONTENT_TYPE)],
sink,
)
}

pub(crate) struct MakeMetricService {
reg: SharedRegistry,
}
type SharedRegistry = Arc<Mutex<Registry>>;

impl MakeMetricService {
pub(crate) fn new(registry: Registry) -> MakeMetricService {
MakeMetricService {
impl MetricService {
fn new(registry: Registry) -> Self {
Self {
reg: Arc::new(Mutex::new(registry)),
}
}
}

impl<T> Service<T> for MakeMetricService {
type Response = MetricService;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: T) -> Self::Future {
let reg = self.reg.clone();
let fut = async move { Ok(MetricService { reg }) };
Box::pin(fut)
fn get_reg(&self) -> SharedRegistry {
Arc::clone(&self.reg)
}
}
2 changes: 1 addition & 1 deletion misc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ base64 = "0.22"
clap = { version = "4.5.4", features = ["derive"] }
futures = "0.3"
futures-timer = "3"
hyper = { version = "0.14", features = ["server", "tcp", "http1"] }
axum = "0.7"
libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] }
prometheus-client = { workspace = true }
serde = "1.0.197"
Expand Down
125 changes: 36 additions & 89 deletions misc/server/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,115 +18,62 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use hyper::http::StatusCode;
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, Server};
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::pin::Pin;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::net::TcpListener;

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
pub(crate) async fn metrics_server(
registry: Registry,
metrics_path: String,
) -> Result<(), hyper::Error> {
) -> Result<(), std::io::Error> {
// Serve on localhost.
let addr = ([0, 0, 0, 0], 8888).into();

let server = Server::bind(&addr).serve(MakeMetricService::new(registry, metrics_path.clone()));
tracing::info!(metrics_server=%format!("http://{}{}", server.local_addr(), metrics_path));
server.await?;
let addr: SocketAddr = ([0, 0, 0, 0], 8888).into();
let service = MetricService::new(registry);
let server = Router::new()
.route(&metrics_path, get(respond_with_metrics))
.with_state(service);
let tcp_listener = TcpListener::bind(addr).await?;
let local_addr = tcp_listener.local_addr()?;
tracing::info!(metrics_server=%format!("http://{}{}", local_addr, metrics_path));
axum::serve(tcp_listener, server.into_make_service()).await?;
Ok(())
}
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
metrics_path: String,
}

type SharedRegistry = Arc<Mutex<Registry>>;
async fn respond_with_metrics(state: State<MetricService>) -> impl IntoResponse {
let mut sink = String::new();
let reg = state.get_reg();
encode(&mut sink, &reg.lock().unwrap()).unwrap();

impl MetricService {
fn get_reg(&mut self) -> SharedRegistry {
Arc::clone(&self.reg)
}
fn respond_with_metrics(&mut self) -> Response<String> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE.try_into().unwrap(),
);

let reg = self.get_reg();
encode(&mut response.body_mut(), &reg.lock().unwrap()).unwrap();

*response.status_mut() = StatusCode::OK;

response
}
fn respond_with_404_not_found(&mut self) -> Response<String> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(format!(
"Not found try localhost:[port]/{}",
self.metrics_path
))
.unwrap()
}
(
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, METRICS_CONTENT_TYPE)],
sink,
)
}

impl Service<Request<Body>> for MetricService {
type Response = Response<String>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
let req_path = req.uri().path();
let req_method = req.method();
let resp = if (req_method == Method::GET) && (req_path == self.metrics_path) {
// Encode and serve metrics from registry.
self.respond_with_metrics()
} else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
}
#[derive(Clone)]
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
}

pub(crate) struct MakeMetricService {
reg: SharedRegistry,
metrics_path: String,
}
type SharedRegistry = Arc<Mutex<Registry>>;

impl MakeMetricService {
pub(crate) fn new(registry: Registry, metrics_path: String) -> MakeMetricService {
MakeMetricService {
reg: Arc::new(Mutex::new(registry)),
metrics_path,
impl MetricService {
fn new(reg: Registry) -> Self {
Self {
reg: Arc::new(Mutex::new(reg)),
}
}
}

impl<T> Service<T> for MakeMetricService {
type Response = MetricService;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: T) -> Self::Future {
let reg = self.reg.clone();
let metrics_path = self.metrics_path.clone();
let fut = async move { Ok(MetricService { reg, metrics_path }) };
Box::pin(fut)
fn get_reg(&self) -> SharedRegistry {
Arc::clone(&self.reg)
}
}
Loading