Skip to content

Commit

Permalink
Merge pull request #1936 from beyonnex-io/wot-tm-validation
Browse files Browse the repository at this point in the history
#1650 provide WoT TM validation
  • Loading branch information
thjaeckle committed Aug 26, 2024
2 parents 1c12c09 + 7aadc82 commit e4ce12e
Show file tree
Hide file tree
Showing 237 changed files with 11,357 additions and 1,698 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,6 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
getFilter().ifPresent(theFilter -> jsonObjectBuilder.set(JsonFields.FILTER, theFilter));
}

@Override
public String getTypePrefix() {
return TYPE_PREFIX;
}

@Override
public SubscribeForPersistedEvents setDittoHeaders(final DittoHeaders dittoHeaders) {
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
Expand Down
4 changes: 4 additions & 0 deletions base/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-metrics</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-metrics-service</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-tracing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.eclipse.ditto.internal.utils.config.raw.RawConfigSupplier;
import org.eclipse.ditto.internal.utils.health.status.StatusSupplierActor;
import org.eclipse.ditto.internal.utils.metrics.config.MetricsConfig;
import org.eclipse.ditto.internal.utils.metrics.prometheus.PrometheusReporterRoute;
import org.eclipse.ditto.internal.utils.metrics.service.prometheus.PrometheusReporterRoute;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
49 changes: 43 additions & 6 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

<properties>
<scala.version>2.13</scala.version> <!-- for scala libraries the scala version is used in their artifactId -->
<scala.full.version>2.13.12</scala.full.version>
<scala.full.version>2.13.14</scala.full.version>
<scala-parser-combinators.version>1.1.2</scala-parser-combinators.version>
<scala-java8-compat.version>1.0.2</scala-java8-compat.version>

Expand All @@ -36,7 +36,8 @@

<!-- ### Compile dependencies versions -->
<minimal-json.version>0.9.5</minimal-json.version>
<jackson-bom.version>2.16.1</jackson-bom.version>
<jackson-bom.version>2.17.1</jackson-bom.version>
<json-schema-validator.version>1.5.1</json-schema-validator.version>
<typesafe-config.version>1.4.3</typesafe-config.version>
<ssl-config-core.version>0.6.1</ssl-config-core.version>
<kafka-client.version>3.6.1</kafka-client.version>
Expand All @@ -45,8 +46,8 @@
<eddsa.version>0.3.0</eddsa.version>
<lz4-java.version>1.8.0</lz4-java.version>

<pekko-bom.version>1.0.2</pekko-bom.version>
<pekko-http-bom.version>1.0.0</pekko-http-bom.version>
<pekko-bom.version>1.0.3</pekko-bom.version>
<pekko-http-bom.version>1.0.1</pekko-http-bom.version>
<pekko-persistence-mongodb.version>1.2.0</pekko-persistence-mongodb.version>
<pekko-persistence-inmemory.version>1.0.0</pekko-persistence-inmemory.version>
<pekko-management.version>1.0.0</pekko-management.version>
Expand All @@ -62,7 +63,7 @@
<!-- AWS SDK version needed for MongoDB AWS IAM authentication -->
<awssdk.version>2.26.21</awssdk.version>

<jjwt.version>0.12.5</jjwt.version>
<jjwt.version>0.12.6</jjwt.version>
<asm.version>9.2</asm.version>
<qpid-jms-client.version>1.11.0</qpid-jms-client.version>
<pjfanning-pekko-rabbitmq.version>7.0.0</pjfanning-pekko-rabbitmq.version>
Expand All @@ -78,7 +79,7 @@
<janino.version>3.1.11</janino.version>

<!-- ### Metrics and Tracing -->
<kamon.version>2.7.0</kamon.version>
<kamon.version>2.7.1</kamon.version>

<jsr305.version>3.0.2</jsr305.version>

Expand Down Expand Up @@ -133,6 +134,22 @@
<scope>import</scope>
</dependency>

<dependency>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId>
<version>${json-schema-validator.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
Expand Down Expand Up @@ -540,11 +557,21 @@
<artifactId>ditto-rql-query</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-wot-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-wot-model</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-wot-validation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-wot-integration</artifactId>
Expand Down Expand Up @@ -618,6 +645,11 @@
<artifactId>ditto-internal-utils-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-jwt</artifactId>
Expand Down Expand Up @@ -668,6 +700,11 @@
<artifactId>ditto-internal-utils-metrics</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-metrics-service</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-extension</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class ConnectionsAmountIllegalException extends DittoRuntimeExcepti
public static final String ERROR_CODE = ERROR_CODE_PREFIX + "connections.amount.illegal";

private static final String MESSAGE_TEMPLATE =
"The amount of requested exceptions exceeds the limit of ''{0}''.";
"The amount of requested connections exceeds the limit of ''{0}''.";

private static final String DEFAULT_DESCRIPTION = "Please request less connection ids.";

Expand Down
10 changes: 9 additions & 1 deletion connectivity/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>ditto-connectivity</artifactId>
<groupId>org.eclipse.ditto</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>ditto-connectivity-service</artifactId>
<name>Eclipse Ditto :: Connectivity :: Service</name>
Expand Down Expand Up @@ -55,6 +55,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-wot-model</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-wot-validation</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
Expand All @@ -69,6 +73,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signalenrichment</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-http</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-persistence</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.http.DefaultHttpProxyConfig;
import org.eclipse.ditto.base.service.config.http.HttpProxyConfig;
import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.http.config.DefaultHttpProxyConfig;
import org.eclipse.ditto.internal.utils.http.config.HttpProxyConfig;

import com.typesafe.config.Config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import java.util.List;
import java.util.Map;

import org.eclipse.ditto.base.service.config.http.HttpProxyConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.http.config.HttpProxyConfig;

import com.typesafe.config.Config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
Expand Down Expand Up @@ -73,9 +77,9 @@
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey;
Expand All @@ -86,11 +90,6 @@
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionEvent;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.japi.pf.ReceiveBuilder;

/**
* Base class for publisher actors. Holds the map of configured targets.
*
Expand Down Expand Up @@ -474,10 +473,13 @@ private SendingOrDropped publishToGenericTarget(final ExpressionResolver resolve
final ExternalMessage mappedMessage = applyHeaderMapping(resolver, outbound, headerMapping);
final var startedSpan = DittoTracing.newPreparedSpan(
mappedMessage.getHeaders(),
SpanOperationName.of(connection.getConnectionType() + "_publish")
SpanOperationName.of("publish " + connection.getConnectionType() + " " +
outboundSource.getType()
)
)
.connectionId(connection.getId())
.tag(SpanTagKey.CONNECTION_TYPE.getTagForValue(connection.getConnectionType()))
.tag(SpanTagKey.CONNECTION_TARGET.getTagForValue(publishTarget.toString()))
.start();
final var mappedMessageWithTraceContext =
mappedMessage.withHeaders(startedSpan.propagateContext(mappedMessage.getHeaders()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
import java.util.Map;
import java.util.function.Supplier;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
Expand All @@ -43,7 +46,7 @@ final class MappingTimer {

private static final Logger LOGGER = LoggerFactory.getLogger(MappingTimer.class);

private static final String TIMER_NAME = "connectivity_message_mapping";
private static final String TIMER_NAME = "message_mapping";
private static final String INBOUND = "inbound";
private static final String OUTBOUND = "outbound";
private static final String PAYLOAD_SEGMENT_NAME = "payload";
Expand Down Expand Up @@ -128,34 +131,42 @@ <T> T overall(final Supplier<T> supplier) {
* The current span context is attached to each resulting external messages.
*
* @param mapper ID of the used mapper.
* @param outboundSignal the outbound signal which was mapped.
* @param supplier the supplier which is invoked and measured.
* @return the result of the supplier.
*/
List<ExternalMessage> outboundPayload(final String mapper, final Supplier<List<ExternalMessage>> supplier) {
final var startedTimer = startNewTimerSegment(mapper);
startedSpan = spawnChildSpanFromStartedTimer(startedTimer);
List<ExternalMessage> outboundPayload(final String mapper,
final OutboundSignal.Mappable outboundSignal,
final Supplier<List<ExternalMessage>> supplier
) {
final var startedTimer = timer.startNewSegment(PAYLOAD_SEGMENT_NAME).tag(MAPPER_TAG_NAME, mapper);
startedSpan = spawnChildSpanFromStartedTimer(startedTimer, PAYLOAD_SEGMENT_NAME + " " + mapper);
return timed(
startedTimer,
() -> {
final var externalMessages = supplier.get();
return externalMessages.stream()
.map(this::propagateContextToExternalMessage)
.toList();
.map(externalMessage ->
propagateContextToExternalMessage(externalMessage,
outboundSignal.getSource().getDittoHeaders().getTraceParent().orElse(null))
).toList();
}
);
}

private StartedTimer startNewTimerSegment(final String mapperName) {
return timer.startNewSegment(PAYLOAD_SEGMENT_NAME).tag(MAPPER_TAG_NAME, mapperName);
}

private StartedSpan spawnChildSpanFromStartedTimer(final StartedTimer startedTimer) {
final var preparedSpan = startedSpan.spawnChild(SpanOperationName.of(startedTimer.getName()));
private StartedSpan spawnChildSpanFromStartedTimer(final StartedTimer startedTimer, final String segmentName) {
final var preparedSpan =
startedSpan.spawnChild(SpanOperationName.of(startedTimer.getName() + " " + segmentName));
return preparedSpan.startBy(startedTimer);
}

private ExternalMessage propagateContextToExternalMessage(final ExternalMessage externalMessage) {
return externalMessage.withHeaders(startedSpan.propagateContext(externalMessage.getHeaders()));
private ExternalMessage propagateContextToExternalMessage(final ExternalMessage externalMessage,
@Nullable final String formerTraceParent) {
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = DittoHeaders.newBuilder(externalMessage.getHeaders());
if (formerTraceParent != null) {
dittoHeadersBuilder.traceparent(formerTraceParent);
}
return externalMessage.withHeaders(startedSpan.propagateContext(dittoHeadersBuilder.build()));
}

/**
Expand All @@ -167,8 +178,8 @@ private ExternalMessage propagateContextToExternalMessage(final ExternalMessage
* @return the list of mapped adaptables
*/
List<Adaptable> inboundPayload(final String mapper, final Supplier<List<Adaptable>> supplier) {
final var startedTimer = startNewTimerSegment(mapper);
startedSpan = spawnChildSpanFromStartedTimer(startedTimer);
final var startedTimer = timer.startNewSegment(PAYLOAD_SEGMENT_NAME).tag(MAPPER_TAG_NAME, mapper);
startedSpan = spawnChildSpanFromStartedTimer(startedTimer, PAYLOAD_SEGMENT_NAME + " " + mapper);
return timed(
startedTimer,
() -> {
Expand All @@ -193,7 +204,7 @@ private Adaptable propagateContextToAdaptable(final Adaptable adaptable) {
*/
Signal<?> inboundProtocol(final Supplier<Signal<?>> supplier) {
final var startedTimer = timer.startNewSegment(PROTOCOL_SEGMENT_NAME);
startedSpan = spawnChildSpanFromStartedTimer(startedTimer);
startedSpan = spawnChildSpanFromStartedTimer(startedTimer, PROTOCOL_SEGMENT_NAME);
return timed(startedTimer, () -> propagateContextToSignalDittoHeaders(supplier.get()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand All @@ -47,9 +49,6 @@
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;

import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;

/**
* Processes outgoing {@link Signal}s to {@link ExternalMessage}s.
* Encapsulates the message processing logic from the message mapping processor actor.
Expand Down Expand Up @@ -229,7 +228,7 @@ private Stream<MappingOutcome<OutboundSignal.Mapped>> runMapper(final OutboundSi
.debug("Applying mapper <{}> to message <{}>", mapper.getId(), adaptable);

final List<ExternalMessage> messages =
timer.outboundPayload(mapper.getId(), () -> checkForNull(mapper.map(adaptable)));
timer.outboundPayload(mapper.getId(), outboundSignal, () -> checkForNull(mapper.map(adaptable)));

logger.withCorrelationId(adaptable)
.debug("Mapping <{}> produced <{}> messages.", mapper.getId(), messages.size());
Expand Down
Loading

0 comments on commit e4ce12e

Please sign in to comment.