Skip to content

Commit

Permalink
add integration test on in-memory mock destination
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 19, 2024
1 parent 7c7d04d commit f576bce
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton

@Factory
@Replaces(factory = DestinationCatalogFactory::class)
@Requires(env = ["test"])
//@Factory
//@Replaces(factory = DestinationCatalogFactory::class)
//@Requires(env = ["test"])
class MockCatalogFactory : DestinationCatalogFactory {
companion object {
val stream1 =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.command.ConfigurationJsonObjectBase
import io.airbyte.cdk.test.util.DestinationDataDumper
import io.airbyte.cdk.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.NoopNameMapper
import io.airbyte.cdk.test.write.BasicFunctionalityIntegrationTest

class MockBasicFunctionalityIntegrationTest: BasicFunctionalityIntegrationTest(
object: ConfigurationJsonObjectBase() {},
DestinationDataDumper { streamName, streamNamespace ->
MockDestinationBackend.readFile(MockStreamLoader.getFilename(streamNamespace, streamName))
},
NoopDestinationCleaner,
NoopExpectedRecordMapper,
NoopNameMapper
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.test.util.OutputRecord

object MockDestinationBackend {
private val lock = Object()
private val files: MutableMap<String, MutableList<OutputRecord>> = mutableMapOf()

fun insert(filename: String, vararg records: OutputRecord) {
synchronized(lock) {
getFile(filename).addAll(records)
}
}

fun readFile(filename: String): List<OutputRecord> {
synchronized(lock) {
return getFile(filename)
}
}

private fun getFile(filename: String): MutableList<OutputRecord> {
synchronized(lock) {
if (!files.containsKey(filename)) {
files[filename] = mutableListOf()
}
return files[filename]!!
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.airbyte.cdk.mock_integration_test

import io.airbyte.cdk.check.DestinationCheck
import javax.inject.Singleton

@Singleton
class MockDestinationCheck: DestinationCheck {
override fun check() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.airbyte.cdk.mock_integration_test

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.data.ObjectValue
import io.airbyte.cdk.message.Batch
import io.airbyte.cdk.message.DestinationRecord
import io.airbyte.cdk.message.SimpleBatch
import io.airbyte.cdk.test.util.OutputRecord
import io.airbyte.cdk.write.DestinationWrite
import io.airbyte.cdk.write.StreamLoader
import java.time.Instant
import java.util.UUID
import javax.inject.Singleton

@Singleton
class MockDestinationWrite : DestinationWrite {
override fun getStreamLoader(stream: DestinationStream): StreamLoader {
return MockStreamLoader(stream)
}
}

class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
override suspend fun processRecords(
records: Iterator<DestinationRecord>,
totalSizeBytes: Long
): Batch {
records.forEach {
MockDestinationBackend.insert(
getFilename(it.stream),
OutputRecord(
UUID.randomUUID(),
Instant.ofEpochMilli(it.emittedAtMs),
Instant.ofEpochMilli(System.currentTimeMillis()),
stream.generationId,
it.data as ObjectValue,
ObjectMapper().valueToTree<JsonNode?>(it.meta).also { metaNode ->
(metaNode as ObjectNode).put("sync_id", stream.syncId)
},
)
)
}
return SimpleBatch(state = Batch.State.COMPLETE)
}

companion object {
fun getFilename(stream: DestinationStream.Descriptor) =
getFilename(stream.namespace, stream.name)
fun getFilename(namespace: String?, name: String) = "(${namespace},${name})"
}
}
4 changes: 4 additions & 0 deletions airbyte-cdk/bulk/core/load/src/test/resources/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---