Skip to content

Commit

Permalink
feat: replace custom tracing with tokio's tracing integration
Browse files Browse the repository at this point in the history
After some experiments using our custom tracing implementation, I
reached the conclusion that it isn't worth the extra complexity. Most
of the ecossystem is using tokio's tracing implementation, and the Rust
OpenTelemetry WG are evaluating replacing their implementation with only
the tracing layer[1].

Thus, I decided to replace it with a tracing integration setup. The
setup is pretty standard, but the implementation uses a custom Layer
to pass data from tracing to OpenTelemetry, continuing to use the
background worker to do most of the heavy lifting. This makes the
hot path that runs in the application loop to be more efficient.

The implementation also uses a custom context to allow for faster
retrieval of tracing information for propagation.

The result is that kiso's users don't have to worry about OpenTelemetry
crates unless they need dynamic attributes or links. Everything else is
handled by the tracing crate.

This commit contains only the necessary code for the migration. There
are still some things to sort out, and primarily performance
improvement to implement. These will be done in other patches as this
one is already too big to properly review.

[1]: open-telemetry/opentelemetry-rust#1571
  • Loading branch information
luisholanda committed Sep 1, 2024
1 parent c35d045 commit c85c39d
Show file tree
Hide file tree
Showing 26 changed files with 3,043 additions and 2,643 deletions.
503 changes: 365 additions & 138 deletions Cargo.lock

Large diffs are not rendered by default.

2,916 changes: 1,476 additions & 1,440 deletions Cargo.nix

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ signal-hook = "0.3.17"
socket2 = { version = "0.5.7", features = ["all"] }
tokio-rustls = "0.26.0"
tokio-util = { version = "0.7.11", features = ["rt"] }
tonic-build.version = "0.12.1"
tonic-build = "0.12.1"
tonic-health = { version = "0.12.1", default-features = false }
tracing = "0.1.40"
tracing-log = { version = "0.2.0", features = ["interest-cache"] }

[workspace.dependencies.axum]
version = "0.7.5"
Expand Down Expand Up @@ -72,6 +74,11 @@ features = [
"timeout",
]

[workspace.dependencies.tracing-subscriber]
version = "0.3.18"
default-features = false
features = ["env-filter", "json", "smallvec", "parking_lot", "time", "registry", "ansi"]

[workspace.lints.rust]
rust_2018_idioms = "deny"

Expand Down Expand Up @@ -132,5 +139,4 @@ unnecessary_wraps = "warn"
useless_format = "warn"

[profile.dev]
debug = 0
strip = "debuginfo"
debug = 1
7 changes: 3 additions & 4 deletions examples/echo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures_util::{
stream::{MapOk, Repeat, Take},
StreamExt, TryStreamExt,
};
use kiso::{observability::Exporters, settings::CmdDescriptor};
use kiso::{settings::CmdDescriptor, tracing::Exporters};
use tonic::{Request, Response, Status};

tonic::include_proto!("grpc.examples.echo");
Expand Down Expand Up @@ -78,10 +78,9 @@ fn main() {
.install_from_args();

kiso::rt::block_on(async {
kiso::observability::initialize(Exporters {
log_exporter: opentelemetry_stdout::LogExporter::default(),
log_backtrace_printer: Box::new(|bc| format!("{bc:?}").into()),
kiso::tracing::initialize(Exporters {
span_exporter: opentelemetry_stdout::SpanExporter::default(),
span_sampler: opentelemetry_sdk::trace::Sampler::AlwaysOn,
});

let echo = echo_server::EchoServer::new(EchoServer);
Expand Down
2 changes: 0 additions & 2 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
pre-commit-hooks = {
url = "github:cachix/pre-commit-hooks.nix";
inputs.nixpkgs.follows = "nixpkgs";
inputs.flake-utils.follows = "flake-utils";
};
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
inputs.flake-utils.follows = "flake-utils";
};
};

Expand Down
3 changes: 3 additions & 0 deletions kiso/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ tonic.workspace = true
tonic-health.workspace = true
tower.workspace = true
tower-http.workspace = true
tracing.workspace = true
tracing-log.workspace = true
tracing-subscriber.workspace = true
46 changes: 27 additions & 19 deletions kiso/src/clients/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use futures_util::{future::BoxFuture, FutureExt, TryStreamExt};
use http_body::Frame;
use http_body_util::BodyExt;
use hyper::{body::Body as HttpBody, Request, Response};
use opentelemetry::trace::Status;
use opentelemetry_semantic_conventions::{attribute, trace};
use tokio::{sync::mpsc::Sender, task::AbortHandle};
use tonic::{
Expand All @@ -27,7 +26,7 @@ use super::{
https::{TracedBody, TracingService as HttpTracingService},
resolver::ServiceDiscoveryStream,
};
use crate::{clients::HttpsClientSettings, context::Deadline, observability::tracing::Span};
use crate::{clients::HttpsClientSettings, context::Deadline};

/// A production-ready gRPC channel.
///
Expand Down Expand Up @@ -205,7 +204,10 @@ impl BackgroundResolver {
}
Ok(None) => return,
Err(err) => {
crate::error!("resolve failure in service discovery: {err:?}");
tracing::error!(
name: "kiso.client.grpc.resolver.failure",
error = &err as &(dyn std::error::Error + 'static),
"resolve failure in service discovery");
return;
}
}
Expand Down Expand Up @@ -284,13 +286,13 @@ where
let method = splits.next().unwrap();
let service = splits.next().unwrap();

let span = crate::context::current::<Span>();
let span = tracing::Span::current();

let rpc_name = format!("{service}/{method}");
span.update_name(rpc_name.clone());
span.set_attribute(trace::RPC_SYSTEM, "grpc");
span.set_attribute(trace::RPC_SERVICE, service.to_string());
span.set_attribute(trace::RPC_METHOD, method.to_string());
span.record("otel.name", &rpc_name);
span.record(trace::RPC_SYSTEM, "grpc");
span.record(trace::RPC_SERVICE, service);
span.record(trace::RPC_METHOD, method);

let req = req.map(|b| LogMsgsBody::new("SENT", rpc_name.clone(), b).boxed_unsync());
let fut = self.inner.call(req);
Expand All @@ -299,9 +301,9 @@ where
let res = fut.await?;

if let Some(status) = tonic::Status::from_header_map(res.headers()) {
span.set_attribute(trace::RPC_GRPC_STATUS_CODE, status.code() as i64);
span.record(trace::RPC_GRPC_STATUS_CODE, status.code() as i64);
if status.code() != tonic::Code::Ok {
span.set_status(Status::error(status.message().to_string()));
span.record("error", status.message());
}
}

Expand All @@ -315,6 +317,7 @@ pub struct LogMsgsBody<B> {
counter: i64,
typ: &'static str,
rpc_name: String,
event_name: &'static str,
}

impl<B> LogMsgsBody<B> {
Expand All @@ -324,6 +327,11 @@ impl<B> LogMsgsBody<B> {
counter: 1,
typ,
rpc_name,
event_name: if typ == "SENT" {
"kiso.client.grpc.message.sent"
} else {
"kiso.client.grpc.message.recv"
},
}
}
}
Expand All @@ -344,19 +352,19 @@ where
unsafe { std::task::ready!(Pin::new_unchecked(&mut this.body).poll_frame(cx)?) };

if let Some(data_frame) = inner_frame.as_ref().and_then(|f| f.data_ref()) {
crate::debug!(
tracing::debug!(
{
otel.name = this.event_name,
{ attribute::RPC_MESSAGE_COMPRESSED_SIZE } = data_frame.remaining() as i64,
{ attribute::RPC_MESSAGE_ID } = this.counter,
{ attribute::RPC_MESSAGE_TYPE } = this.typ,
},
"{}: {} message {} with size {}",
this.rpc_name,
this.counter,
this.typ,
this.counter,
data_frame.remaining()
)
.attr(
attribute::RPC_MESSAGE_COMPRESSED_SIZE,
data_frame.remaining() as i64,
)
.attr(attribute::RPC_MESSAGE_ID, this.counter)
.attr(attribute::RPC_MESSAGE_TYPE, this.typ);
);
}

Poll::Ready(inner_frame.map(Ok))
Expand Down
Loading

0 comments on commit c85c39d

Please sign in to comment.