Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signals processing #1014

Merged
merged 20 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7f9a765
Creating processors subproject
LikeTheSalad Aug 23, 2023
27a8f2c
Creating the interceptor interface and interceptable base class
LikeTheSalad Aug 23, 2023
f86d71f
Created and interceptable exporter for spans
LikeTheSalad Aug 23, 2023
238080c
Created and interceptable exporter for metrics
LikeTheSalad Aug 23, 2023
e683579
Created and interceptable exporter for logs
LikeTheSalad Aug 23, 2023
91a3c25
Creating README file
LikeTheSalad Aug 23, 2023
8a48848
Updating component_owners.yml
LikeTheSalad Aug 23, 2023
1af3867
Running spotlessApply
LikeTheSalad Aug 23, 2023
68f84f8
Removing inheritance for interceptable exporters
LikeTheSalad Aug 24, 2023
5d7aaff
Adding breedx-splk as component owner
LikeTheSalad Aug 24, 2023
4f1014e
Adding "interceptAll" as Interceptor default method
LikeTheSalad Aug 24, 2023
9f5c6d9
Update processors/src/main/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
266472f
Update processors/src/main/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
33443fe
Update processors/src/main/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
0d6edb3
Update processors/src/main/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
8f61209
Update processors/src/test/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
1faffe9
Using CopyOnWriteArrayList in ComposableInterceptor
LikeTheSalad Aug 25, 2023
90fbc9d
Making exporters final
LikeTheSalad Aug 25, 2023
8c93d6c
Adding nullable imports
LikeTheSalad Aug 25, 2023
e044d7c
Making test functions package private
LikeTheSalad Aug 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/component_owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ components:
- HaloFour
noop-api:
- jack-berg
processors:
- LikeTheSalad
- breedx-splk
prometheus-collector:
- jkwatson
resource-providers:
Expand Down
10 changes: 10 additions & 0 deletions processors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Processors

This module provides tools to intercept and process signals globally.

## Component owners

- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
- [Jason Plumb](https://github.com/breedx-splk), Splunk

Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
17 changes: 17 additions & 0 deletions processors/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
}

description = "Tools to intercept and process signals globally."
otelJava.moduleName.set("io.opentelemetry.contrib.processors")

java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

dependencies {
api("io.opentelemetry:opentelemetry-sdk")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.Collection;

/** Intercepts logs before delegating them to the real exporter. */
public final class InterceptableLogRecordExporter implements LogRecordExporter {
private final LogRecordExporter delegate;
private final Interceptor<LogRecordData> interceptor;

public InterceptableLogRecordExporter(
LogRecordExporter delegate, Interceptor<LogRecordData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return delegate.export(interceptor.interceptAll(logs));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;

/** Intercepts metrics before delegating them to the real exporter. */
public final class InterceptableMetricExporter implements MetricExporter {
private final MetricExporter delegate;
private final Interceptor<MetricData> interceptor;

public InterceptableMetricExporter(MetricExporter delegate, Interceptor<MetricData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
return delegate.export(interceptor.interceptAll(metrics));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}

@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return delegate.getAggregationTemporality(instrumentType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;

/** Intercepts spans before delegating them to the real exporter. */
public final class InterceptableSpanExporter implements SpanExporter {
private final SpanExporter delegate;
private final Interceptor<SpanData> interceptor;

public InterceptableSpanExporter(SpanExporter delegate, Interceptor<SpanData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return delegate.export(interceptor.interceptAll(spans));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor.api;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;

/**
* Intercepts a signal before it gets exported. The signal can get updated and/or filtered out based
* on each interceptor implementation.
*/
public interface Interceptor<T> {

/**
* Intercepts a signal.
*
* @param item The signal object.
* @return The received signal modified (or null for excluding this signal from getting exported).
* If there's no operation needed to be done for a specific signal, it should be returned as
* is.
*/
@Nullable
T intercept(T item);
LikeTheSalad marked this conversation as resolved.
Show resolved Hide resolved

/** Intercepts a collection of signals. */
default Collection<T> interceptAll(Collection<T> items) {
List<T> result = new ArrayList<>();

for (T item : items) {
T intercepted = intercept(item);
if (intercepted != null) {
result.add(intercepted);
}
}

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor.common;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nullable;

/** Allows to run an item through a list of interceptors in the order they were added. */
public final class ComposableInterceptor<T> implements Interceptor<T> {
private final CopyOnWriteArrayList<Interceptor<T>> interceptors = new CopyOnWriteArrayList<>();

public void add(Interceptor<T> interceptor) {
interceptors.addIfAbsent(interceptor);
}

@Nullable
@Override
public T intercept(T item) {
T intercepted = item;
for (Interceptor<T> interceptor : interceptors) {
intercepted = interceptor.intercept(intercepted);
if (intercepted == null) {
break;
}
}
return intercepted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.contrib.interceptor.common.ComposableInterceptor;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.data.Body;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import java.util.List;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class InterceptableLogRecordExporterTest {
private InMemoryLogRecordExporter memoryLogRecordExporter;
private Logger logger;
private ComposableInterceptor<LogRecordData> interceptor;

@BeforeEach
void setUp() {
memoryLogRecordExporter = InMemoryLogRecordExporter.create();
interceptor = new ComposableInterceptor<>();
logger =
SdkLoggerProvider.builder()
.addLogRecordProcessor(
SimpleLogRecordProcessor.create(
new InterceptableLogRecordExporter(memoryLogRecordExporter, interceptor)))
.build()
.get("TestScope");
}

@Test
void verifyLogModification() {
interceptor.add(
item -> {
ModifiableLogRecordData modified = new ModifiableLogRecordData(item);
modified.attributes.put("global.attr", "from interceptor");
return modified;
});

logger
.logRecordBuilder()
.setBody("One log")
.setAttribute(AttributeKey.stringKey("local.attr"), "local")
.emit();

List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(1, finishedLogRecordItems.size());
LogRecordData logRecordData = finishedLogRecordItems.get(0);
assertEquals(2, logRecordData.getAttributes().size());
assertEquals(
"from interceptor",
logRecordData.getAttributes().get(AttributeKey.stringKey("global.attr")));
assertEquals("local", logRecordData.getAttributes().get(AttributeKey.stringKey("local.attr")));
}

@Test
void verifyLogFiltering() {
interceptor.add(
item -> {
if (item.getBody().asString().contains("deleted")) {
return null;
}
return item;
});

logger.logRecordBuilder().setBody("One log").emit();
logger.logRecordBuilder().setBody("This log will be deleted").emit();
logger.logRecordBuilder().setBody("Another log").emit();

List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(2, finishedLogRecordItems.size());
assertEquals("One log", finishedLogRecordItems.get(0).getBody().asString());
assertEquals("Another log", finishedLogRecordItems.get(1).getBody().asString());
}

private static class ModifiableLogRecordData implements LogRecordData {
private final LogRecordData delegate;
private final AttributesBuilder attributes = Attributes.builder();

private ModifiableLogRecordData(LogRecordData delegate) {
this.delegate = delegate;
}

@Override
public Resource getResource() {
return delegate.getResource();
}

@Override
public InstrumentationScopeInfo getInstrumentationScopeInfo() {
return delegate.getInstrumentationScopeInfo();
}

@Override
public long getTimestampEpochNanos() {
return delegate.getTimestampEpochNanos();
}

@Override
public long getObservedTimestampEpochNanos() {
return delegate.getObservedTimestampEpochNanos();
}

@Override
public SpanContext getSpanContext() {
return delegate.getSpanContext();
}

@Override
public Severity getSeverity() {
return delegate.getSeverity();
}

@Nullable
@Override
public String getSeverityText() {
return delegate.getSeverityText();
}

@Override
public Body getBody() {
return delegate.getBody();
}

@Override
public Attributes getAttributes() {
return attributes.putAll(delegate.getAttributes()).build();
}

@Override
public int getTotalAttributeCount() {
return delegate.getTotalAttributeCount();
}
}
}
Loading
Loading