Skip to content

Commit

Permalink
WebSockets Next integration wiht OTel and Micrometer
Browse files Browse the repository at this point in the history
  • Loading branch information
michalvavrik committed Aug 27, 2024
1 parent 2ca2703 commit 53805a4
Show file tree
Hide file tree
Showing 106 changed files with 4,018 additions and 53 deletions.
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/opentelemetry-tracing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ See the main xref:opentelemetry.adoc#exporters[OpenTelemetry Guide exporters] se
** Kafka
** Pulsar
* https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests)
* xref:websockets-next-reference.adoc[`websockets-next`]


=== Disable parts of the automatic tracing
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/telemetry-micrometer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ Refer to the xref:./management-interface-reference.adoc[management interface ref
** Camel Messaging
* https://quarkus.io/guides/stork-reference[`quarkus-smallrye-stork`]
* https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests)
* xref:websockets-next-reference.adoc[`websockets-next`]

Check warning on line 764 in docs/src/main/asciidoc/telemetry-micrometer.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in 'Configuration Reference'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in 'Configuration Reference'.", "location": {"path": "docs/src/main/asciidoc/telemetry-micrometer.adoc", "range": {"start": {"line": 764, "column": 18}}}, "severity": "INFO"}

== Configuration Reference

Expand Down
22 changes: 22 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,28 @@ quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3>
<2> Set the number of characters of a text message payload which will be logged.

Check warning on line 1018 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using ', which (non restrictive clause preceded by a comma)' or 'that (restrictive clause without a comma)' rather than 'which'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using ', which (non restrictive clause preceded by a comma)' or 'that (restrictive clause without a comma)' rather than 'which'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 1018, "column": 48}}}, "severity": "INFO"}
<3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`.

[[telemetry]]
== Telemetry

When the OpenTelemetry extension is present, traces for opened and closed WebSocket connections are collected by default.
If you do not require WebSocket traces, you can disable collecting of traces like in the example below:

[source, properties]
----
quarkus.websockets-next.server.traces.enabled=false
quarkus.websockets-next.client.traces.enabled=false
----

When the Micrometer extension is present, metrics for messages, errors and bytes transferred are collected.
If you do not require WebSocket metrics, you can disable metrics like in the example below:

[source, properties]
----
quarkus.websockets-next.server.metrics.enabled=false
quarkus.websockets-next.client.metrics.enabled=false
----

NOTE: Telemetry for the `BasicWebSocketConnector` is currently not supported.

[[websocket-next-configuration-reference]]
== Configuration reference
Expand Down
6 changes: 6 additions & 0 deletions extensions/websockets-next/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<artifactId>mutiny-kotlin</artifactId>
<scope>test</scope>
</dependency>
<!-- Needed for InMemorySpanExporter to verify captured traces -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.websockets.next.deployment;

import java.util.function.Consumer;

import io.quarkus.builder.item.MultiBuildItem;
import io.quarkus.websockets.next.runtime.telemetry.TelemetrySupportProviderBuilder;

/**
* Provides a way to set up metrics and/or traces support in the WebSockets extension.
*/
final class TelemetrySupportBuilderCustomizerBuildItem extends MultiBuildItem {

final Consumer<TelemetrySupportProviderBuilder> builderCustomizer;

TelemetrySupportBuilderCustomizerBuildItem(Consumer<TelemetrySupportProviderBuilder> builderCustomizer) {
this.builderCustomizer = builderCustomizer;
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.quarkus.websockets.next.test.utils.WSClient.ReceiverMode.BINARY;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.Set;
import java.util.stream.Collectors;

import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocketConnectOptions;

record Connection(URI uri, String[] messagesToSend, WSClient client, boolean broadcast, boolean binaryMode,
String[] expectedResponses) {

static Connection of(URI uri, boolean broadcast, boolean binaryMode, String[] sentMessages, String[] expectedResponses) {
return new Connection(uri, sentMessages, null, broadcast, binaryMode, expectedResponses);
}

static Connection of(URI uri, String expectedResponse, boolean binaryMode, String... messages) {
return new Connection(uri, messages, null, false, binaryMode, new String[] { expectedResponse });
}

private Connection with(WSClient client) {
return new Connection(uri, messagesToSend, client, broadcast, binaryMode, expectedResponses);
}

private Set<String> getReceivedMessages() {
return client.getMessages().stream().map(Buffer::toString).collect(Collectors.toSet());
}

static void sendAndAssertResponses(Vertx vertx, Connection... connections) {
openConnectionsThenSend(connections, vertx, 0);
}

private static void openConnectionsThenSend(Connection[] connections, Vertx vertx, int idx) {
var connection = connections[idx];
final WSClient client = connection.binaryMode() ? new WSClient(vertx, BINARY) : new WSClient(vertx);
try (client) {
client.connect(new WebSocketConnectOptions(), connection.uri());
connections[idx] = connection.with(client);

if (idx < connections.length - 1) {
openConnectionsThenSend(connections, vertx, idx + 1);
} else {
sendMessages(connections, connection.binaryMode());
}
}
}

private static void sendMessages(Connection[] connections, boolean binaryMode) {
for (Connection connection : connections) {
for (String message : connection.messagesToSend()) {
if (binaryMode) {
connection.client().sendAndAwait(Buffer.buffer(message));
} else {
connection.client().sendAndAwait(message);
}
}
var expectedResponses = connection.expectedResponses();
if (expectedResponses.length != 0) {
if (connection.broadcast()) {
for (Connection conn : connections) {
assertResponses(conn, expectedResponses);
}
} else {
assertResponses(connection, expectedResponses);
}
}
}
}

private static void assertResponses(Connection connection, String[] expectedResponses) {
connection.client.waitForMessages(expectedResponses.length);
Set<String> actualResponses = connection.getReceivedMessages();

for (String expectedResponse : expectedResponses) {
assertTrue(actualResponses.contains(expectedResponse),
() -> "Expected response '%s' not found, was: %s".formatted(expectedResponse, actualResponses));
}

connection.client().getMessages().clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.quarkus.websockets.next.test.telemetry;

import java.util.Arrays;

public interface ExpectedServerEndpointResponse {

String[] NO_RESPONSE = new String[] {};
EchoExpectedServerEndpointResponse ECHO_RESPONSE = new EchoExpectedServerEndpointResponse();
DoubleEchoExpectedServerEndpointResponse DOUBLE_ECHO_RESPONSE = new DoubleEchoExpectedServerEndpointResponse();

/**
* Endpoint returns void, Uni<Void> or results in exception and theefore, there is no response.
*/
final class NoExpectedServerEndpointResponse {

public String[] getExpectedResponse() {
return new String[0];
}
}

/**
* Received message is prefixed with 'echo 0: ' and returned.
*/
final class EchoExpectedServerEndpointResponse implements ExpectedServerEndpointResponse {

public String[] getExpectedResponse(String[] sentMessages) {
return Arrays.stream(sentMessages).map(msg -> "echo 0: " + msg).toArray(String[]::new);
}

}

/**
* For each received message 'msg' endpoint returns 'echo 0: msg' and 'echo 1: msg'
*/
final class DoubleEchoExpectedServerEndpointResponse implements ExpectedServerEndpointResponse {

public String[] getExpectedResponse(String[] sentMessages) {
return Arrays.stream(sentMessages)
.mapMulti((msg, consumer) -> {
consumer.accept("echo 0: " + msg);
consumer.accept("echo 1: " + msg);
})
.toArray(String[]::new);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.websockets.next.test.telemetry;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;

import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;

@ApplicationScoped
public class InMemorySpanExporterProducer {

@Produces
@Singleton
InMemorySpanExporter inMemorySpanExporter() {
return InMemorySpanExporter.create();
}

}
Loading

0 comments on commit 53805a4

Please sign in to comment.