Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument cassandra executeReactive method #6441

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
634755c
new cassandra 4.14 module to support reactive tracing
SimoneGiusso Aug 21, 2022
f59e4d7
fix DuplicateFileCopyingException
SimoneGiusso Aug 21, 2022
128f0f6
Move CassandraSingletons in instrumentation libraries and add new Cas…
SimoneGiusso Aug 27, 2022
25de63f
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 15, 2022
0ca0549
sync with main branch
SimoneGiusso Nov 15, 2022
8841346
WIP: Fix tests
SimoneGiusso Nov 16, 2022
5f03349
WIP: use lowest java-driver-core version that supports reactive
SimoneGiusso Nov 17, 2022
15172a4
WIP: create common test package + wrap original TracingCqlSession
SimoneGiusso Nov 24, 2022
eec8951
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 24, 2022
15e4b42
WIP: avoid having both cassandra 4.* modules applied
SimoneGiusso Nov 26, 2022
ad461f6
Address last comments from mateuszrzeszutek's review
SimoneGiusso Nov 27, 2022
3e8ea44
Share code keeping it package protected + clean-up
SimoneGiusso Nov 27, 2022
74fd803
Make CompletionStageFunction public and rename package for classes un…
SimoneGiusso Nov 27, 2022
308a11e
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 29, 2022
dc416a8
Merge remote-tracking branch 'upstream/main' into instrumenting-execu…
trask Feb 27, 2023
aced6ee
Move library to 4.4
trask Feb 27, 2023
ed61c54
more
trask Feb 27, 2023
865b337
more
trask Feb 27, 2023
a924133
todo
trask Feb 27, 2023
19225cd
Update docs
trask Feb 27, 2023
2385797
Rewrite cassandra reactive test in java
SimoneGiusso Feb 27, 2023
8593fe4
Remove deprecated ExecutionInfo#getStatement method
SimoneGiusso Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/supported-libraries.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ These are the supported libraries and frameworks:
| [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/java-handler.html) | 1.0+ | [opentelemetry-aws-lambda-core-1.0](../instrumentation/aws-lambda/aws-lambda-core-1.0/library),<br>[opentelemetry-aws-lambda-events-2.2](../instrumentation/aws-lambda/aws-lambda-events-2.2/library) | [FaaS Server Spans] |
| [AWS SDK](https://aws.amazon.com/sdk-for-java/) | 1.11.x and 2.2.0+ | [opentelemetry-aws-sdk-1.11](../instrumentation/aws-sdk/aws-sdk-1.11/library),<br>[opentelemetry-aws-sdk-1.11-autoconfigure](../instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure),<br>[opentelemetry-aws-sdk-2.2](../instrumentation/aws-sdk/aws-sdk-2.2/library),<br>[opentelemetry-aws-sdk-2.2-autoconfigure](../instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure) | [Messaging Spans], [Database Client Spans], [HTTP Client Spans] |
| [Azure Core](https://docs.microsoft.com/en-us/java/api/overview/azure/core-readme) | 1.14+ | N/A | Context propagation |
| [Cassandra Driver](https://github.com/datastax/java-driver) | 3.0+ | N/A | [Database Client Spans] |
| [Cassandra Driver](https://github.com/datastax/java-driver) | 3.0+ | [opentelemetry-cassandra-4.4](../instrumentation/cassandra/cassandra-4.4/library) | [Database Client Spans] |
| [Couchbase Client](https://github.com/couchbase/couchbase-java-client) | 2.0+ and 3.1+ | N/A | [Database Client Spans] |
| [c3p0](https://github.com/swaldman/c3p0) | 0.9.2+ | [opentelemetry-c3p0-0.9](../instrumentation/c3p0-0.9/library) | [Database Pool Metrics] |
| [Dropwizard Metrics](https://metrics.dropwizard.io/) | 4.0+ (disabled by default) | N/A | none |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies {
testInstrumentation(project(":instrumentation:guava-10.0:javaagent"))

latestDepTestLibrary("com.datastax.cassandra:cassandra-driver-core:3.+") // see cassandra-4.0 module

testInstrumentation(project(":instrumentation:cassandra:cassandra-4.0:javaagent"))
testInstrumentation(project(":instrumentation:cassandra:cassandra-4.4:javaagent"))
}

// Requires old Guava. Can't use enforcedPlatform since predates BOM
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
plugins {
id("otel.java-conventions")
}

dependencies {
api(project(":testing-common"))

implementation("org.testcontainers:testcontainers:1.17.5")
implementation("com.datastax.oss:java-driver-core:4.0.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.cassandra.v4.common;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL;
Expand All @@ -26,14 +28,12 @@
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -42,17 +42,20 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;

public class CassandraClientTest {

private static final Logger logger = LoggerFactory.getLogger(CassandraClientTest.class);
public abstract class AbstractCassandraTest {

@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraTest.class);

@SuppressWarnings("rawtypes")
private static GenericContainer cassandra;

private static int cassandraPort;
protected static int cassandraPort;

protected abstract InstrumentationExtension testing();

protected CqlSession wrap(CqlSession session) {
return session;
}

@BeforeAll
static void beforeAll() {
Expand All @@ -79,30 +82,33 @@ void syncTest(Parameter parameter) {

session.execute(parameter.statement);

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(NET_SOCK_PEER_NAME, "localhost"),
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
satisfies(
DB_CASSANDRA_COORDINATOR_ID, val -> val.isInstanceOf(String.class)),
satisfies(
DB_CASSANDRA_IDEMPOTENCE, val -> val.isInstanceOf(Boolean.class)),
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
equalTo(DB_CASSANDRA_TABLE, parameter.table))));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(NET_SOCK_PEER_NAME, "localhost"),
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
satisfies(
DB_CASSANDRA_COORDINATOR_ID,
val -> val.isInstanceOf(String.class)),
satisfies(
DB_CASSANDRA_IDEMPOTENCE,
val -> val.isInstanceOf(Boolean.class)),
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
equalTo(DB_CASSANDRA_TABLE, parameter.table))));

session.close();
}
Expand All @@ -112,42 +118,48 @@ void syncTest(Parameter parameter) {
void asyncTest(Parameter parameter) throws Exception {
CqlSession session = getSession(parameter.keyspace);

testing.runWithSpan(
"parent",
() ->
session
.executeAsync(parameter.statement)
.toCompletableFuture()
.whenComplete((result, throwable) -> testing.runWithSpan("child", () -> {}))
.get());
testing()
.runWithSpan(
"parent",
() ->
session
.executeAsync(parameter.statement)
.toCompletableFuture()
.whenComplete((result, throwable) -> testing().runWithSpan("child", () -> {}))
.get());

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(NET_SOCK_PEER_NAME, "localhost"),
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
satisfies(
DB_CASSANDRA_COORDINATOR_ID, val -> val.isInstanceOf(String.class)),
satisfies(
DB_CASSANDRA_IDEMPOTENCE, val -> val.isInstanceOf(Boolean.class)),
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
equalTo(DB_CASSANDRA_TABLE, parameter.table)),
span ->
span.hasName("child").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0))));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(NET_SOCK_PEER_NAME, "localhost"),
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
satisfies(
DB_CASSANDRA_COORDINATOR_ID,
val -> val.isInstanceOf(String.class)),
satisfies(
DB_CASSANDRA_IDEMPOTENCE,
val -> val.isInstanceOf(Boolean.class)),
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
equalTo(DB_CASSANDRA_TABLE, parameter.table)),
span ->
span.hasName("child")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));

session.close();
}
Expand Down Expand Up @@ -260,7 +272,7 @@ private static Stream<Arguments> provideAsyncParameters() {
"users"))));
}

private static class Parameter {
protected static class Parameter {
public final String keyspace;
public final String statement;
public final String expectedStatement;
Expand All @@ -284,16 +296,17 @@ public Parameter(
}
}

CqlSession getSession(String keyspace) {
protected CqlSession getSession(String keyspace) {
DriverConfigLoader configLoader =
DefaultDriverConfigLoader.builder()
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(0))
.build();
return CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace(keyspace)
.build();
return wrap(
CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace(keyspace)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ muzzle {
pass {
group.set("com.datastax.oss")
module.set("java-driver-core")
versions.set("[4.0,)")
versions.set("[4.0,4.4)")
assertInverse.set(true)
}
}
Expand All @@ -16,6 +16,11 @@ dependencies {

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

testImplementation(project(":instrumentation:cassandra:cassandra-4-common:testing"))

testInstrumentation(project(":instrumentation:cassandra:cassandra-3.0:javaagent"))
testInstrumentation(project(":instrumentation:cassandra:cassandra-4.4:javaagent"))
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static java.util.Collections.singletonList;
import static net.bytebuddy.matcher.ElementMatchers.not;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(InstrumentationModule.class)
public class CassandraClientInstrumentationModule extends InstrumentationModule {

public CassandraClientInstrumentationModule() {
super("cassandra", "cassandra-4.0");
}
Expand All @@ -22,4 +26,10 @@ public CassandraClientInstrumentationModule() {
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new SessionBuilderInstrumentation());
}

@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
// new public interface introduced in version 4.4
return not(hasClassesNamed("com.datastax.dse.driver.api.core.cql.reactive.ReactiveSession"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;

public class CassandraTest extends AbstractCassandraTest {

@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@Override
protected InstrumentationExtension testing() {
return testing;
}
}
Loading