From ecde2550557d477457d603d17940f8e26f6f3987 Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Wed, 18 Sep 2024 07:27:03 -0700 Subject: [PATCH] destination-e2e: convert production code to kotlin --- .../destination-dev-null/metadata.yaml | 2 +- .../dev_null/DevNullDestination.java | 56 -------------- .../dev_null/DevNullDestination.kt | 53 +++++++++++++ .../destination-e2e-test/metadata.yaml | 2 +- .../e2e_test/FailAfterNDestination.java | 71 ----------------- .../e2e_test/LoggingDestination.java | 40 ---------- .../e2e_test/SilentDestination.java | 58 -------------- .../e2e_test/TestingDestinations.java | 76 ------------------ .../e2e_test/ThrottledDestination.java | 71 ----------------- .../e2e_test/logging/BaseLogger.java | 47 ----------- .../e2e_test/logging/EveryNthLogger.java | 37 --------- .../e2e_test/logging/FirstNLogger.java | 30 -------- .../e2e_test/logging/LoggingConsumer.java | 75 ------------------ .../logging/RandomSamplingLogger.java | 41 ---------- .../e2e_test/logging/TestingLogger.java | 19 ----- .../logging/TestingLoggerFactory.java | 52 ------------- .../e2e_test/FailAfterNDestination.kt | 64 +++++++++++++++ .../e2e_test/LoggingDestination.kt | 36 +++++++++ .../destination/e2e_test/SilentDestination.kt | 41 ++++++++++ .../e2e_test/TestingDestinations.kt | 77 +++++++++++++++++++ .../e2e_test/ThrottledDestination.kt | 63 +++++++++++++++ .../e2e_test/logging/BaseLogger.kt | 43 +++++++++++ .../e2e_test/logging/EveryNthLogger.kt | 33 ++++++++ .../e2e_test/logging/FirstNLogger.kt | 25 ++++++ .../e2e_test/logging/LoggingConsumer.kt | 58 ++++++++++++++ .../e2e_test/logging/RandomSamplingLogger.kt | 34 ++++++++ .../e2e_test/logging/TestingLogger.kt | 16 ++++ .../e2e_test/logging/TestingLoggerFactory.kt | 44 +++++++++++ docs/integrations/destinations/e2e-test.md | 1 + 29 files changed, 590 insertions(+), 675 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-dev-null/src/main/java/io/airbyte/integrations/destination/dev_null/DevNullDestination.java create mode 100644 airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullDestination.kt delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/LoggingDestination.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/SilentDestination.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.java delete mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/LoggingDestination.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/SilentDestination.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/TestingDestinations.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.kt create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.kt diff --git a/airbyte-integrations/connectors/destination-dev-null/metadata.yaml b/airbyte-integrations/connectors/destination-dev-null/metadata.yaml index e367ce3a5594..b2a61dd90fb6 100644 --- a/airbyte-integrations/connectors/destination-dev-null/metadata.yaml +++ b/airbyte-integrations/connectors/destination-dev-null/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: a7bcc9d8-13b3-4e49-b80d-d020b90045e3 - dockerImageTag: 0.3.3 + dockerImageTag: 0.4.0 dockerRepository: airbyte/destination-dev-null githubIssueLabel: destination-dev-null icon: airbyte.svg diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/java/io/airbyte/integrations/destination/dev_null/DevNullDestination.java b/airbyte-integrations/connectors/destination-dev-null/src/main/java/io/airbyte/integrations/destination/dev_null/DevNullDestination.java deleted file mode 100644 index ed077c41939b..000000000000 --- a/airbyte-integrations/connectors/destination-dev-null/src/main/java/io/airbyte/integrations/destination/dev_null/DevNullDestination.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.dev_null; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.cdk.integrations.base.IntegrationRunner; -import io.airbyte.cdk.integrations.base.spec_modification.SpecModifyingDestination; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.destination.e2e_test.TestingDestinations; -import io.airbyte.protocol.models.v0.ConnectorSpecification; -import java.util.Iterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DevNullDestination extends SpecModifyingDestination implements Destination { - - private static final Logger LOGGER = LoggerFactory.getLogger(DevNullDestination.class); - private static final String DEV_NULL_DESTINATION_TITLE = "E2E Test (/dev/null) Destination Spec"; - - public DevNullDestination() { - super(new TestingDestinations()); - } - - public static void main(final String[] args) throws Exception { - LOGGER.info("Starting destination: {}", DevNullDestination.class); - new IntegrationRunner(new DevNullDestination()).run(args); - LOGGER.info("Completed destination: {}", DevNullDestination.class); - } - - /** - * 1. Update the title. 2. Only keep the "silent" mode. - */ - @Override - public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) { - final ConnectorSpecification spec = Jsons.clone(originalSpec); - - ((ObjectNode) spec.getConnectionSpecification()).put("title", DEV_NULL_DESTINATION_TITLE); - - final ObjectNode properties = (ObjectNode) spec.getConnectionSpecification().get("properties").get("test_destination"); - final ArrayNode types = (ArrayNode) properties.get("oneOf"); - final Iterator typesIterator = types.elements(); - while (typesIterator.hasNext()) { - final JsonNode typeNode = typesIterator.next(); - if (!typeNode.get("properties").get("test_destination_type").get("const").asText().equalsIgnoreCase("silent")) { - typesIterator.remove(); - } - } - return spec; - } - -} diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullDestination.kt b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullDestination.kt new file mode 100644 index 000000000000..9f0ccbaca6b4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullDestination.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.dev_null + +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.cdk.integrations.base.IntegrationRunner +import io.airbyte.cdk.integrations.base.spec_modification.SpecModifyingDestination +import io.airbyte.commons.json.Jsons +import io.airbyte.integrations.destination.e2e_test.TestingDestinations +import io.airbyte.protocol.models.v0.ConnectorSpecification +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class DevNullDestination : SpecModifyingDestination(TestingDestinations()), Destination { + /** 1. Update the title. 2. Only keep the "silent" mode. */ + override fun modifySpec(originalSpec: ConnectorSpecification): ConnectorSpecification { + val spec = Jsons.clone(originalSpec) + + (spec.connectionSpecification as ObjectNode).put("title", DEV_NULL_DESTINATION_TITLE) + + val properties = + spec.connectionSpecification["properties"]["test_destination"] as ObjectNode + val types = properties["oneOf"] as ArrayNode + val typesIterator = types.elements() + while (typesIterator.hasNext()) { + val typeNode = typesIterator.next() + if ( + !typeNode["properties"]["test_destination_type"]["const"] + .asText() + .equals("silent", ignoreCase = true) + ) { + typesIterator.remove() + } + } + return spec + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(DevNullDestination::class.java) + private const val DEV_NULL_DESTINATION_TITLE = "E2E Test (/dev/null) Destination Spec" + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + LOGGER.info("Starting destination: {}", DevNullDestination::class.java) + IntegrationRunner(DevNullDestination()).run(args) + LOGGER.info("Completed destination: {}", DevNullDestination::class.java) + } + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/metadata.yaml b/airbyte-integrations/connectors/destination-e2e-test/metadata.yaml index ff7331f83803..fd331e2b4caa 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/metadata.yaml +++ b/airbyte-integrations/connectors/destination-e2e-test/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: unknown connectorType: destination definitionId: 2eb65e87-983a-4fd7-b3e3-9d9dc6eb8537 - dockerImageTag: 0.3.6 + dockerImageTag: 0.4.0 dockerRepository: airbyte/destination-e2e-test githubIssueLabel: destination-e2e-test icon: airbyte.svg diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java deleted file mode 100644 index 8146b5159c23..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.cdk.integrations.BaseConnector; -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FailAfterNDestination extends BaseConnector implements Destination { - - private static final Logger LOGGER = LoggerFactory.getLogger(FailAfterNDestination.class); - - @Override - public AirbyteConnectionStatus check(final JsonNode config) { - return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } - - @Override - public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { - return new FailAfterNConsumer(config.get("test_destination").get("num_messages").asLong(), outputRecordCollector); - } - - public static class FailAfterNConsumer implements AirbyteMessageConsumer { - - private final Consumer outputRecordCollector; - private final long numMessagesAfterWhichToFail; - private long numMessagesSoFar; - - public FailAfterNConsumer(final long numMessagesAfterWhichToFail, final Consumer outputRecordCollector) { - this.numMessagesAfterWhichToFail = numMessagesAfterWhichToFail; - this.outputRecordCollector = outputRecordCollector; - this.numMessagesSoFar = 0; - LOGGER.info("Will fail after {} messages", numMessagesAfterWhichToFail); - } - - @Override - public void start() {} - - @Override - public void accept(final AirbyteMessage message) { - numMessagesSoFar += 1; - - if (numMessagesSoFar > numMessagesAfterWhichToFail) { - throw new IllegalStateException("Forcing a fail after processing " + numMessagesAfterWhichToFail + " messages."); - } - - if (message.getType() == Type.STATE) { - LOGGER.info("Emitting state: {}", message); - outputRecordCollector.accept(message); - } - } - - @Override - public void close() {} - - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/LoggingDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/LoggingDestination.java deleted file mode 100644 index f608fc3258c6..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/LoggingDestination.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.cdk.integrations.BaseConnector; -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.integrations.destination.e2e_test.logging.LoggingConsumer; -import io.airbyte.integrations.destination.e2e_test.logging.TestingLoggerFactory; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This destination logs each record it receives. - */ -public class LoggingDestination extends BaseConnector implements Destination { - - private static final Logger LOGGER = LoggerFactory.getLogger(LoggingDestination.class); - - @Override - public AirbyteConnectionStatus check(final JsonNode config) { - return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } - - @Override - public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { - return new LoggingConsumer(new TestingLoggerFactory(config), catalog, outputRecordCollector); - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/SilentDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/SilentDestination.java deleted file mode 100644 index bd041f53fafc..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/SilentDestination.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.cdk.integrations.BaseConnector; -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import java.util.function.Consumer; - -/** - * This destination silently receives records. - */ -public class SilentDestination extends BaseConnector implements Destination { - - @Override - public AirbyteConnectionStatus check(final JsonNode config) { - return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } - - @Override - public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { - return new SilentDestination.RecordConsumer(outputRecordCollector); - } - - public static class RecordConsumer implements AirbyteMessageConsumer { - - private final Consumer outputRecordCollector; - - public RecordConsumer(final Consumer outputRecordCollector) { - this.outputRecordCollector = outputRecordCollector; - } - - @Override - public void start() {} - - @Override - public void accept(final AirbyteMessage message) { - if (message.getType() == Type.STATE) { - outputRecordCollector.accept(message); - } - } - - @Override - public void close() {} - - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java deleted file mode 100644 index ea5a31f84048..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.cdk.integrations.BaseConnector; -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.cdk.integrations.base.IntegrationRunner; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import java.util.Map; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestingDestinations extends BaseConnector implements Destination { - - private static final Logger LOGGER = LoggerFactory.getLogger(TestingDestinations.class); - - private final Map destinationMap; - - public enum TestDestinationType { - LOGGING, - THROTTLED, - SILENT, - FAILING - } - - public TestingDestinations() { - this(ImmutableMap.builder() - .put(TestDestinationType.LOGGING, new LoggingDestination()) - .put(TestDestinationType.THROTTLED, new ThrottledDestination()) - .put(TestDestinationType.SILENT, new SilentDestination()) - .put(TestDestinationType.FAILING, new FailAfterNDestination()) - .build()); - } - - public TestingDestinations(final Map destinationMap) { - this.destinationMap = destinationMap; - } - - private Destination selectDestination(final JsonNode config) { - return destinationMap.get(TestDestinationType.valueOf(config.get("test_destination").get("test_destination_type").asText())); - } - - @Override - public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) - throws Exception { - return selectDestination(config).getConsumer(config, catalog, outputRecordCollector); - } - - @Override - public AirbyteConnectionStatus check(final JsonNode config) throws Exception { - try { - return selectDestination(config).check(config); - } catch (final Exception e) { - return new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(e.getMessage()); - } - } - - public static void main(final String[] args) throws Exception { - final Destination destination = new TestingDestinations(); - LOGGER.info("starting destination: {}", TestingDestinations.class); - new IntegrationRunner(destination).run(args); - LOGGER.info("completed destination: {}", TestingDestinations.class); - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.java deleted file mode 100644 index c027d460b55c..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test; - -import static java.lang.Thread.sleep; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.cdk.integrations.BaseConnector; -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This destination logs each record it receives. It sleeps for millis_per_record between accepting - * each record. Useful for simulating backpressure / slow destination writes. - */ -public class ThrottledDestination extends BaseConnector implements Destination { - - private static final Logger LOGGER = LoggerFactory.getLogger(ThrottledDestination.class); - - @Override - public AirbyteConnectionStatus check(final JsonNode config) { - return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } - - @Override - public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { - return new ThrottledConsumer(config.get("test_destination").get("millis_per_record").asLong(), outputRecordCollector); - } - - public static class ThrottledConsumer implements AirbyteMessageConsumer { - - private final Consumer outputRecordCollector; - private final long millisPerRecord; - - public ThrottledConsumer(final long millisPerRecord, final Consumer outputRecordCollector) { - this.millisPerRecord = millisPerRecord; - this.outputRecordCollector = outputRecordCollector; - LOGGER.info("Will sleep {} millis before processing every record", millisPerRecord); - } - - @Override - public void start() {} - - @Override - public void accept(final AirbyteMessage message) throws Exception { - sleep(millisPerRecord); - - if (message.getType() == Type.STATE) { - LOGGER.info("Emitting state: {}", message); - outputRecordCollector.accept(message); - } - } - - @Override - public void close() {} - - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java deleted file mode 100644 index 7dea9eadc5ca..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test.logging; - -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; - -public abstract class BaseLogger implements TestingLogger { - - protected final AirbyteStreamNameNamespacePair streamNamePair; - protected final int maxEntryCount; - protected int loggedEntryCount = 0; - - public BaseLogger(final AirbyteStreamNameNamespacePair streamNamePair, final int maxEntryCount) { - this.streamNamePair = streamNamePair; - this.maxEntryCount = maxEntryCount; - } - - protected String entryMessage(final AirbyteRecordMessage recordMessage) { - return String.format("[%s] %s #%04d: %s", - emissionTimestamp(recordMessage.getEmittedAt()), - streamName(streamNamePair), - loggedEntryCount, - recordMessage.getData()); - } - - protected static String streamName(final AirbyteStreamNameNamespacePair pair) { - if (pair.getNamespace() == null) { - return pair.getName(); - } else { - return String.format("%s.%s", pair.getNamespace(), pair.getName()); - } - } - - protected static String emissionTimestamp(final long emittedAt) { - return OffsetDateTime - .ofInstant(Instant.ofEpochMilli(emittedAt), ZoneId.systemDefault()) - .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java deleted file mode 100644 index e597b086a27d..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test.logging; - -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EveryNthLogger extends BaseLogger implements TestingLogger { - - private static final Logger LOGGER = LoggerFactory.getLogger(EveryNthLogger.class); - - private final int nthEntryToLog; - private int currentEntry = 0; - - public EveryNthLogger(final AirbyteStreamNameNamespacePair streamNamePair, final int nthEntryToLog, final int maxEntryCount) { - super(streamNamePair, maxEntryCount); - this.nthEntryToLog = nthEntryToLog; - } - - @Override - public void log(final AirbyteRecordMessage recordMessage) { - if (loggedEntryCount >= maxEntryCount) { - return; - } - - currentEntry += 1; - if (currentEntry % nthEntryToLog == 0) { - loggedEntryCount += 1; - LOGGER.info(entryMessage(recordMessage)); - } - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java deleted file mode 100644 index 4fed7adbe49a..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test.logging; - -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FirstNLogger extends BaseLogger implements TestingLogger { - - private static final Logger LOGGER = LoggerFactory.getLogger(FirstNLogger.class); - - public FirstNLogger(final AirbyteStreamNameNamespacePair streamNamePair, final int maxEntryCount) { - super(streamNamePair, maxEntryCount); - } - - @Override - public void log(final AirbyteRecordMessage recordMessage) { - if (loggedEntryCount >= maxEntryCount) { - return; - } - - loggedEntryCount += 1; - LOGGER.info(entryMessage(recordMessage)); - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java deleted file mode 100644 index f63c4699446f..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test.logging; - -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LoggingConsumer implements AirbyteMessageConsumer { - - private static final Logger LOGGER = LoggerFactory.getLogger(LoggingConsumer.class); - - private final TestingLoggerFactory loggerFactory; - private final ConfiguredAirbyteCatalog configuredCatalog; - private final Consumer outputRecordCollector; - private final Map loggers; - - public LoggingConsumer(final TestingLoggerFactory loggerFactory, - final ConfiguredAirbyteCatalog configuredCatalog, - final Consumer outputRecordCollector) { - this.loggerFactory = loggerFactory; - this.configuredCatalog = configuredCatalog; - this.outputRecordCollector = outputRecordCollector; - this.loggers = new HashMap<>(); - } - - @Override - public void start() { - for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { - final AirbyteStream stream = configuredStream.getStream(); - final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair.fromAirbyteStream(stream); - final TestingLogger logger = loggerFactory.create(streamNamePair); - loggers.put(streamNamePair, logger); - } - } - - @Override - public void accept(final AirbyteMessage message) { - if (message.getType() == Type.STATE) { - LOGGER.info("Emitting state: {}", message); - outputRecordCollector.accept(message); - } else if (message.getType() == Type.TRACE) { - LOGGER.info("Received a trace: {}", message); - } else if (message.getType() == Type.RECORD) { - final AirbyteRecordMessage recordMessage = message.getRecord(); - final AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage); - - if (!loggers.containsKey(pair)) { - throw new IllegalArgumentException( - String.format( - "Message contained record from a stream that was not in the catalog.\n Catalog: %s\n Message: %s", - Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); - } - - loggers.get(pair).log(recordMessage); - } - } - - @Override - public void close() {} - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java deleted file mode 100644 index 657ac6ab5a1d..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test.logging; - -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import java.util.Random; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RandomSamplingLogger extends BaseLogger implements TestingLogger { - - private static final Logger LOGGER = LoggerFactory.getLogger(RandomSamplingLogger.class); - - private final double samplingRatio; - private final Random random; - - public RandomSamplingLogger(final AirbyteStreamNameNamespacePair streamNamePair, - final double samplingRatio, - final long seed, - final int maxEntryCount) { - super(streamNamePair, maxEntryCount); - this.samplingRatio = samplingRatio; - this.random = new Random(seed); - } - - @Override - public void log(final AirbyteRecordMessage recordMessage) { - if (loggedEntryCount >= maxEntryCount) { - return; - } - - if (random.nextDouble() < samplingRatio) { - loggedEntryCount += 1; - LOGGER.info(entryMessage(recordMessage)); - } - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.java deleted file mode 100644 index 86e507ea032b..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test.logging; - -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; - -public interface TestingLogger { - - enum LoggingType { - FirstN, - EveryNth, - RandomSampling - } - - void log(AirbyteRecordMessage recordMessage); - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java deleted file mode 100644 index 8c2bf6d8ffeb..000000000000 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.e2e_test.logging; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.destination.e2e_test.logging.TestingLogger.LoggingType; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; - -public class TestingLoggerFactory { - - private final JsonNode config; - - public TestingLoggerFactory(final JsonNode config) { - this.config = config; - } - - public TestingLogger create(final AirbyteStreamNameNamespacePair streamNamePair) { - if (!config.has("test_destination")) { - throw new IllegalArgumentException("Property test_destination is required, but not found"); - } - - final JsonNode testDestinationConfig = config.get("test_destination"); - - if (!testDestinationConfig.has("logging_config")) { - throw new IllegalArgumentException("Property logging_config is required, but not found"); - } - - final JsonNode logConfig = testDestinationConfig.get("logging_config"); - final LoggingType loggingType = LoggingType.valueOf(logConfig.get("logging_type").asText()); - switch (loggingType) { - case FirstN -> { - final int maxEntryCount = logConfig.get("max_entry_count").asInt(); - return new FirstNLogger(streamNamePair, maxEntryCount); - } - case EveryNth -> { - final int nthEntryToLog = logConfig.get("nth_entry_to_log").asInt(); - final int maxEntryCount = logConfig.get("max_entry_count").asInt(); - return new EveryNthLogger(streamNamePair, nthEntryToLog, maxEntryCount); - } - case RandomSampling -> { - final double samplingRatio = logConfig.get("sampling_ratio").asDouble(); - final long seed = logConfig.has("seed") ? logConfig.get("seed").asLong() : System.currentTimeMillis(); - final int maxEntryCount = logConfig.get("max_entry_count").asInt(); - return new RandomSamplingLogger(streamNamePair, samplingRatio, seed, maxEntryCount); - } - default -> throw new IllegalArgumentException("Unexpected logging type: " + loggingType); - } - } - -} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.kt new file mode 100644 index 000000000000..d9903442d189 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.integrations.BaseConnector +import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import java.util.function.Consumer +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class FailAfterNDestination : BaseConnector(), Destination { + override fun check(config: JsonNode): AirbyteConnectionStatus { + return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) + } + + override fun getConsumer( + config: JsonNode, + catalog: ConfiguredAirbyteCatalog, + outputRecordCollector: Consumer + ): AirbyteMessageConsumer { + return FailAfterNConsumer( + config["test_destination"]["num_messages"].asLong(), + outputRecordCollector + ) + } + + class FailAfterNConsumer( + private val numMessagesAfterWhichToFail: Long, + private val outputRecordCollector: Consumer + ) : AirbyteMessageConsumer { + private var numMessagesSoFar: Long = 0 + + init { + LOGGER.info("Will fail after {} messages", numMessagesAfterWhichToFail) + } + + override fun start() {} + + override fun accept(message: AirbyteMessage) { + numMessagesSoFar += 1 + + check(numMessagesSoFar <= numMessagesAfterWhichToFail) { + "Forcing a fail after processing $numMessagesAfterWhichToFail messages." + } + + if (message.type == AirbyteMessage.Type.STATE) { + LOGGER.info("Emitting state: {}", message) + outputRecordCollector.accept(message) + } + } + + override fun close() {} + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(FailAfterNDestination::class.java) + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/LoggingDestination.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/LoggingDestination.kt new file mode 100644 index 000000000000..eee5e5077947 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/LoggingDestination.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.integrations.BaseConnector +import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.integrations.destination.e2e_test.logging.LoggingConsumer +import io.airbyte.integrations.destination.e2e_test.logging.TestingLoggerFactory +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import java.util.function.Consumer +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** This destination logs each record it receives. */ +class LoggingDestination : BaseConnector(), Destination { + override fun check(config: JsonNode): AirbyteConnectionStatus { + return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) + } + + override fun getConsumer( + config: JsonNode, + catalog: ConfiguredAirbyteCatalog, + outputRecordCollector: Consumer + ): AirbyteMessageConsumer { + return LoggingConsumer(TestingLoggerFactory(config), catalog, outputRecordCollector) + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(LoggingDestination::class.java) + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/SilentDestination.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/SilentDestination.kt new file mode 100644 index 000000000000..74181da0b1c9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/SilentDestination.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.integrations.BaseConnector +import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import java.util.function.Consumer + +/** This destination silently receives records. */ +class SilentDestination : BaseConnector(), Destination { + override fun check(config: JsonNode): AirbyteConnectionStatus { + return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) + } + + override fun getConsumer( + config: JsonNode, + catalog: ConfiguredAirbyteCatalog, + outputRecordCollector: Consumer + ): AirbyteMessageConsumer { + return RecordConsumer(outputRecordCollector) + } + + class RecordConsumer(private val outputRecordCollector: Consumer) : + AirbyteMessageConsumer { + override fun start() {} + + override fun accept(message: AirbyteMessage) { + if (message.type == AirbyteMessage.Type.STATE) { + outputRecordCollector.accept(message) + } + } + + override fun close() {} + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/TestingDestinations.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/TestingDestinations.kt new file mode 100644 index 000000000000..bcd3d5cc18c1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/TestingDestinations.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test + +import com.fasterxml.jackson.databind.JsonNode +import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.integrations.BaseConnector +import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.cdk.integrations.base.IntegrationRunner +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import java.util.function.Consumer +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class TestingDestinations +@JvmOverloads +constructor( + private val destinationMap: Map = + ImmutableMap.builder() + .put(TestDestinationType.LOGGING, LoggingDestination()) + .put(TestDestinationType.THROTTLED, ThrottledDestination()) + .put(TestDestinationType.SILENT, SilentDestination()) + .put(TestDestinationType.FAILING, FailAfterNDestination()) + .build() +) : BaseConnector(), Destination { + enum class TestDestinationType { + LOGGING, + THROTTLED, + SILENT, + FAILING + } + + private fun selectDestination(config: JsonNode): Destination? { + return destinationMap[ + TestDestinationType.valueOf( + config["test_destination"]["test_destination_type"].asText() + ) + ] + } + + @Throws(Exception::class) + override fun getConsumer( + config: JsonNode, + catalog: ConfiguredAirbyteCatalog, + outputRecordCollector: Consumer + ): AirbyteMessageConsumer { + return selectDestination(config)!!.getConsumer(config, catalog, outputRecordCollector) + } + + @Throws(Exception::class) + override fun check(config: JsonNode): AirbyteConnectionStatus { + return try { + selectDestination(config)!!.check(config) + } catch (e: Exception) { + AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(e.message) + } + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(TestingDestinations::class.java) + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + val destination: Destination = TestingDestinations() + LOGGER.info("starting destination: {}", TestingDestinations::class.java) + IntegrationRunner(destination).run(args) + LOGGER.info("completed destination: {}", TestingDestinations::class.java) + } + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.kt new file mode 100644 index 000000000000..2ccf3688236b --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.integrations.BaseConnector +import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import java.util.function.Consumer +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * This destination logs each record it receives. It sleeps for millis_per_record between accepting + * each record. Useful for simulating backpressure / slow destination writes. + */ +class ThrottledDestination : BaseConnector(), Destination { + override fun check(config: JsonNode): AirbyteConnectionStatus { + return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) + } + + override fun getConsumer( + config: JsonNode, + catalog: ConfiguredAirbyteCatalog?, + outputRecordCollector: Consumer + ): AirbyteMessageConsumer { + return ThrottledConsumer( + config["test_destination"]["millis_per_record"].asLong(), + outputRecordCollector + ) + } + + class ThrottledConsumer( + private val millisPerRecord: Long, + private val outputRecordCollector: Consumer + ) : AirbyteMessageConsumer { + init { + LOGGER.info("Will sleep {} millis before processing every record", millisPerRecord) + } + + override fun start() {} + + @Throws(Exception::class) + override fun accept(message: AirbyteMessage) { + Thread.sleep(millisPerRecord) + + if (message.type == AirbyteMessage.Type.STATE) { + LOGGER.info("Emitting state: {}", message) + outputRecordCollector.accept(message) + } + } + + override fun close() {} + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(ThrottledDestination::class.java) + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.kt new file mode 100644 index 000000000000..2d45dc801d8b --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test.logging + +import io.airbyte.protocol.models.v0.AirbyteRecordMessage +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair +import java.time.Instant +import java.time.OffsetDateTime +import java.time.ZoneId +import java.time.format.DateTimeFormatter + +abstract class BaseLogger( + protected val streamNamePair: AirbyteStreamNameNamespacePair, + protected val maxEntryCount: Int +) : TestingLogger { + protected var loggedEntryCount: Int = 0 + + protected fun entryMessage(recordMessage: AirbyteRecordMessage): String { + return String.format( + "[%s] %s #%04d: %s", + emissionTimestamp(recordMessage.emittedAt), + streamName(streamNamePair), + loggedEntryCount, + recordMessage.data + ) + } + + companion object { + protected fun streamName(pair: AirbyteStreamNameNamespacePair): String { + return if (pair.namespace == null) { + pair.name + } else { + String.format("%s.%s", pair.namespace, pair.name) + } + } + + protected fun emissionTimestamp(emittedAt: Long): String { + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(emittedAt), ZoneId.systemDefault()) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) + } + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.kt new file mode 100644 index 000000000000..b63a3682385e --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test.logging + +import io.airbyte.protocol.models.v0.AirbyteRecordMessage +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class EveryNthLogger( + streamNamePair: AirbyteStreamNameNamespacePair, + private val nthEntryToLog: Int, + maxEntryCount: Int +) : BaseLogger(streamNamePair, maxEntryCount), TestingLogger { + private var currentEntry = 0 + + override fun log(recordMessage: AirbyteRecordMessage?) { + if (loggedEntryCount >= maxEntryCount) { + return + } + + currentEntry += 1 + if (currentEntry % nthEntryToLog == 0) { + loggedEntryCount += 1 + LOGGER.info(entryMessage(recordMessage!!)) + } + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(EveryNthLogger::class.java) + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.kt new file mode 100644 index 000000000000..15e1ecbe002d --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.kt @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test.logging + +import io.airbyte.protocol.models.v0.AirbyteRecordMessage +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class FirstNLogger(streamNamePair: AirbyteStreamNameNamespacePair, maxEntryCount: Int) : + BaseLogger(streamNamePair, maxEntryCount), TestingLogger { + override fun log(recordMessage: AirbyteRecordMessage?) { + if (loggedEntryCount >= maxEntryCount) { + return + } + + loggedEntryCount += 1 + LOGGER.info(entryMessage(recordMessage!!)) + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(FirstNLogger::class.java) + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.kt new file mode 100644 index 000000000000..e821a417dc0c --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test.logging + +import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer +import io.airbyte.commons.json.Jsons +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import java.util.function.Consumer +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class LoggingConsumer( + private val loggerFactory: TestingLoggerFactory, + private val configuredCatalog: ConfiguredAirbyteCatalog, + private val outputRecordCollector: Consumer +) : AirbyteMessageConsumer { + private val loggers: MutableMap = HashMap() + + override fun start() { + for (configuredStream in configuredCatalog.streams) { + val stream = configuredStream.stream + val streamNamePair = AirbyteStreamNameNamespacePair.fromAirbyteStream(stream) + val logger = loggerFactory.create(streamNamePair) + loggers[streamNamePair] = logger + } + } + + override fun accept(message: AirbyteMessage) { + if (message.type == AirbyteMessage.Type.STATE) { + LOGGER.info("Emitting state: {}", message) + outputRecordCollector.accept(message) + } else if (message.type == AirbyteMessage.Type.TRACE) { + LOGGER.info("Received a trace: {}", message) + } else if (message.type == AirbyteMessage.Type.RECORD) { + val recordMessage = message.record + val pair = AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage) + + require(loggers.containsKey(pair)) { + String.format( + "Message contained record from a stream that was not in the catalog.\n Catalog: %s\n Message: %s", + Jsons.serialize(configuredCatalog), + Jsons.serialize(recordMessage) + ) + } + + loggers[pair]!!.log(recordMessage) + } + } + + override fun close() {} + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(LoggingConsumer::class.java) + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.kt new file mode 100644 index 000000000000..fa38b1e708b7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test.logging + +import io.airbyte.protocol.models.v0.AirbyteRecordMessage +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair +import java.util.* +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class RandomSamplingLogger( + streamNamePair: AirbyteStreamNameNamespacePair, + private val samplingRatio: Double, + seed: Long, + maxEntryCount: Int +) : BaseLogger(streamNamePair, maxEntryCount), TestingLogger { + private val random = Random(seed) + + override fun log(recordMessage: AirbyteRecordMessage?) { + if (loggedEntryCount >= maxEntryCount) { + return + } + + if (random.nextDouble() < samplingRatio) { + loggedEntryCount += 1 + LOGGER.info(entryMessage(recordMessage!!)) + } + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(RandomSamplingLogger::class.java) + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.kt new file mode 100644 index 000000000000..9be54f648411 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.kt @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test.logging + +import io.airbyte.protocol.models.v0.AirbyteRecordMessage + +interface TestingLogger { + enum class LoggingType { + FirstN, + EveryNth, + RandomSampling + } + + fun log(recordMessage: AirbyteRecordMessage?) +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.kt b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.kt new file mode 100644 index 000000000000..be91958e2d47 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/kotlin/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.e2e_test.logging + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair + +class TestingLoggerFactory(private val config: JsonNode) { + fun create(streamNamePair: AirbyteStreamNameNamespacePair): TestingLogger { + require(config.has("test_destination")) { + "Property test_destination is required, but not found" + } + + val testDestinationConfig = config["test_destination"] + + require(testDestinationConfig.has("logging_config")) { + "Property logging_config is required, but not found" + } + + val logConfig = testDestinationConfig["logging_config"] + val loggingType = TestingLogger.LoggingType.valueOf(logConfig["logging_type"].asText()) + when (loggingType) { + TestingLogger.LoggingType.FirstN -> { + val maxEntryCount = logConfig["max_entry_count"].asInt() + return FirstNLogger(streamNamePair, maxEntryCount) + } + TestingLogger.LoggingType.EveryNth -> { + val nthEntryToLog = logConfig["nth_entry_to_log"].asInt() + val maxEntryCount = logConfig["max_entry_count"].asInt() + return EveryNthLogger(streamNamePair, nthEntryToLog, maxEntryCount) + } + TestingLogger.LoggingType.RandomSampling -> { + val samplingRatio = logConfig["sampling_ratio"].asDouble() + val seed = + if (logConfig.has("seed")) logConfig["seed"].asLong() + else System.currentTimeMillis() + val maxEntryCount = logConfig["max_entry_count"].asInt() + return RandomSamplingLogger(streamNamePair, samplingRatio, seed, maxEntryCount) + } + else -> throw IllegalArgumentException("Unexpected logging type: $loggingType") + } + } +} diff --git a/docs/integrations/destinations/e2e-test.md b/docs/integrations/destinations/e2e-test.md index d38630f92457..8005ad8e03ac 100644 --- a/docs/integrations/destinations/e2e-test.md +++ b/docs/integrations/destinations/e2e-test.md @@ -49,6 +49,7 @@ The OSS and Cloud variants have the same version number starting from version `0 | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------- | +| 0.4.0 | 2024-09-18 | [45648](https://github.com/airbytehq/airbyte/pull/45648) | convert production code to kotlin | | 0.3.6 | 2024-05-09 | [38097](https://github.com/airbytehq/airbyte/pull/38097) | Support dedup | | 0.3.5 | 2024-04-29 | [37366](https://github.com/airbytehq/airbyte/pull/37366) | Support refreshes | | 0.3.4 | 2024-04-16 | [37366](https://github.com/airbytehq/airbyte/pull/37366) | Fix NPE |