From 371f6c7d8d1bc43dd1236bca90e967b4c1598bf6 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 17 Sep 2024 14:27:42 -0700 Subject: [PATCH] add integration test on in-memory mock destination --- .../airbyte/cdk/command/MockCatalogFactory.kt | 6 +-- .../MockBasicFunctionalityIntegrationTest.kt | 19 +++++++ .../MockDestinationBackend.kt | 29 ++++++++++ .../MockDestinationCheck.kt | 9 ++++ .../MockDestinationWrite.kt | 53 +++++++++++++++++++ .../load/src/test/resources/metadata.yaml | 4 ++ 6 files changed, 117 insertions(+), 3 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationBackend.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationCheck.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationWrite.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/resources/metadata.yaml diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/command/MockCatalogFactory.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/command/MockCatalogFactory.kt index e0383e50293d..7ff9cdd2e859 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/command/MockCatalogFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/command/MockCatalogFactory.kt @@ -14,9 +14,9 @@ import io.micronaut.context.annotation.Requires import jakarta.inject.Named import jakarta.inject.Singleton -@Factory -@Replaces(factory = DestinationCatalogFactory::class) -@Requires(env = ["test"]) +//@Factory +//@Replaces(factory = DestinationCatalogFactory::class) +//@Requires(env = ["test"]) class MockCatalogFactory : DestinationCatalogFactory { companion object { val stream1 = diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt new file mode 100644 index 000000000000..b91e2124d16c --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt @@ -0,0 +1,19 @@ +package io.airbyte.cdk.mock_integration_test + +import io.airbyte.cdk.command.ConfigurationJsonObjectBase +import io.airbyte.cdk.test.util.DestinationDataDumper +import io.airbyte.cdk.test.util.NoopDestinationCleaner +import io.airbyte.cdk.test.util.NoopExpectedRecordMapper +import io.airbyte.cdk.test.util.NoopNameMapper +import io.airbyte.cdk.test.write.BasicFunctionalityIntegrationTest + +class MockBasicFunctionalityIntegrationTest: BasicFunctionalityIntegrationTest( + object: ConfigurationJsonObjectBase() {}, + DestinationDataDumper { streamName, streamNamespace -> + MockDestinationBackend.readFile(MockStreamLoader.getFilename(streamNamespace, streamName)) + }, + NoopDestinationCleaner, + NoopExpectedRecordMapper, + NoopNameMapper +) { +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationBackend.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationBackend.kt new file mode 100644 index 000000000000..9d61c1c28a35 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationBackend.kt @@ -0,0 +1,29 @@ +package io.airbyte.cdk.mock_integration_test + +import io.airbyte.cdk.test.util.OutputRecord + +object MockDestinationBackend { + private val lock = Object() + private val files: MutableMap> = mutableMapOf() + + fun insert(filename: String, vararg records: OutputRecord) { + synchronized(lock) { + getFile(filename).addAll(records) + } + } + + fun readFile(filename: String): List { + synchronized(lock) { + return getFile(filename) + } + } + + private fun getFile(filename: String): MutableList { + synchronized(lock) { + if (!files.containsKey(filename)) { + files[filename] = mutableListOf() + } + return files[filename]!! + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationCheck.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationCheck.kt new file mode 100644 index 000000000000..434eb750f324 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationCheck.kt @@ -0,0 +1,9 @@ +package io.airbyte.cdk.mock_integration_test + +import io.airbyte.cdk.check.DestinationCheck +import javax.inject.Singleton + +@Singleton +class MockDestinationCheck: DestinationCheck { + override fun check() {} +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationWrite.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationWrite.kt new file mode 100644 index 000000000000..a6828d777c6a --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/mock_integration_test/MockDestinationWrite.kt @@ -0,0 +1,53 @@ +package io.airbyte.cdk.mock_integration_test + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.data.ObjectValue +import io.airbyte.cdk.message.Batch +import io.airbyte.cdk.message.DestinationRecord +import io.airbyte.cdk.message.SimpleBatch +import io.airbyte.cdk.test.util.OutputRecord +import io.airbyte.cdk.write.DestinationWrite +import io.airbyte.cdk.write.StreamLoader +import java.time.Instant +import java.util.UUID +import javax.inject.Singleton + +@Singleton +class MockDestinationWrite : DestinationWrite { + override fun getStreamLoader(stream: DestinationStream): StreamLoader { + return MockStreamLoader(stream) + } +} + +class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { + override suspend fun processRecords( + records: Iterator, + totalSizeBytes: Long + ): Batch { + records.forEach { + MockDestinationBackend.insert( + getFilename(it.stream), + OutputRecord( + UUID.randomUUID(), + Instant.ofEpochMilli(it.emittedAtMs), + Instant.ofEpochMilli(System.currentTimeMillis()), + stream.generationId, + it.data as ObjectValue, + ObjectMapper().valueToTree(it.meta).also { metaNode -> + (metaNode as ObjectNode).put("sync_id", stream.syncId) + }, + ) + ) + } + return SimpleBatch(state = Batch.State.COMPLETE) + } + + companion object { + fun getFilename(stream: DestinationStream.Descriptor) = + getFilename(stream.namespace, stream.name) + fun getFilename(namespace: String?, name: String) = "(${namespace},${name})" + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/resources/metadata.yaml b/airbyte-cdk/bulk/core/load/src/test/resources/metadata.yaml new file mode 100644 index 000000000000..aff5a4b3c71c --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/resources/metadata.yaml @@ -0,0 +1,4 @@ +--- +data: + dockerRepository: "airbyte/fake-source" + documentationUrl: "https://docs.airbyte.com"