From 5daf229ee130d090107caec5affdf607b0103a29 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Thu, 19 Sep 2024 10:35:10 -0700 Subject: [PATCH] bulk-cdk: StreamIdentifier (#45680) --- .../kotlin/io/airbyte/cdk/StreamIdentifier.kt | 35 ++++++++++++++ .../kotlin/io/airbyte/cdk/StreamNamePair.kt | 24 ---------- .../io/airbyte/cdk/command/InputState.kt | 8 ++-- .../airbyte/cdk/command/InputStateFactory.kt | 15 +++--- .../io/airbyte/cdk/command/InputStateTest.kt | 22 +++++---- .../io/airbyte/cdk/check/CheckOperation.kt | 10 ++-- .../cdk/discover/AirbyteStreamFactory.kt | 4 +- .../airbyte/cdk/discover/DiscoverOperation.kt | 10 ++-- .../airbyte/cdk/discover/DiscoveredStream.kt | 5 +- .../airbyte/cdk/discover/MetadataQuerier.kt | 13 ++--- .../output/CatalogValidationFailureHandler.kt | 37 ++++++--------- .../main/kotlin/io/airbyte/cdk/read/Feed.kt | 14 +++--- .../kotlin/io/airbyte/cdk/read/FeedReader.kt | 2 +- .../io/airbyte/cdk/read/StateManager.kt | 23 +++++---- .../airbyte/cdk/read/StateManagerFactory.kt | 47 +++++++++---------- .../cdk/read/RootReaderIntegrationTest.kt | 5 +- .../cdk/read/StateManagerGlobalStatesTest.kt | 2 +- .../cdk/read/StateManagerStreamStatesTest.kt | 12 ++++- .../ResourceDrivenMetadataQuerierFactory.kt | 32 ++++++------- .../cdk/discover/JdbcMetadataQuerier.kt | 32 ++++++------- .../cdk/read/DefaultJdbcPartitionFactory.kt | 16 +++---- .../cdk/discover/JdbcMetadataQuerierTest.kt | 13 +++-- .../read/DefaultJdbcPartitionFactoryTest.kt | 8 ++-- .../io/airbyte/cdk/read/TestFixtures.kt | 5 +- .../expected-messages-stream-bad-catalog.json | 4 +- .../connectors/source-mysql-v2/metadata.yaml | 2 +- .../mysql/MysqlSourceMetadataQuerier.kt | 32 +++++++------ 27 files changed, 224 insertions(+), 208 deletions(-) create mode 100644 airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/StreamIdentifier.kt delete mode 100644 airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/StreamNamePair.kt diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/StreamIdentifier.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/StreamIdentifier.kt new file mode 100644 index 000000000000..4ffb056e3b59 --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/StreamIdentifier.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk + +import io.airbyte.protocol.models.v0.AirbyteStream +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair +import io.airbyte.protocol.models.v0.StreamDescriptor + +/** + * [StreamIdentifier] is equivalent to [AirbyteStreamNameNamespacePair]. + * + * This exists to avoid coupling the Bulk CDK code too closely to the Airbyte Protocol objects. + */ +data class StreamIdentifier +private constructor( + val namespace: String?, + val name: String, +) { + companion object { + fun from(desc: StreamDescriptor): StreamIdentifier = + StreamIdentifier(desc.namespace, desc.name) + + fun from(stream: AirbyteStream): StreamIdentifier = + StreamIdentifier(stream.namespace, stream.name) + } + + override fun toString(): String { + return if (namespace == null) name else "${namespace}.${name}" + } +} + +fun StreamIdentifier.asProtocolStreamDescriptor(): StreamDescriptor = + StreamDescriptor().withName(name).withNamespace(namespace) diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/StreamNamePair.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/StreamNamePair.kt deleted file mode 100644 index 38e16697c4f1..000000000000 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/StreamNamePair.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk - -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair -import io.airbyte.protocol.models.v0.StreamDescriptor - -/** - * [StreamNamePair] is equivalent to [AirbyteStreamNameNamespacePair]. - * - * This exists to avoid coupling the Bulk CDK code too closely to the Airbyte Protocol objects. - */ -data class StreamNamePair(val name: String, val namespace: String?) { - override fun toString(): String { - return if (namespace == null) name else "${namespace}_${name}" - } -} - -fun StreamNamePair.asProtocolStreamDescriptor(): StreamDescriptor = - StreamDescriptor().withName(name).withNamespace(namespace) - -fun StreamDescriptor.asStreamNamePair(): StreamNamePair = StreamNamePair(name, namespace) diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/InputState.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/InputState.kt index b54561f5b5a7..dad6e6326c2b 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/InputState.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/InputState.kt @@ -2,7 +2,7 @@ package io.airbyte.cdk.command import com.fasterxml.jackson.databind.JsonNode -import io.airbyte.cdk.StreamNamePair +import io.airbyte.cdk.StreamIdentifier /** Union type of the state passed as input to a READ for a source connector. */ sealed interface InputState @@ -11,13 +11,13 @@ data object EmptyInputState : InputState data class GlobalInputState( val global: OpaqueStateValue, - val globalStreams: Map, + val globalStreams: Map, /** Conceivably, some streams may undergo a full refresh alongside independently of the rest. */ - val nonGlobalStreams: Map, + val nonGlobalStreams: Map, ) : InputState data class StreamInputState( - val streams: Map, + val streams: Map, ) : InputState /** State values are opaque for the CDK, the schema is owned by the connector. */ diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/InputStateFactory.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/InputStateFactory.kt index f475c7c9ea55..562c891386f4 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/InputStateFactory.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/InputStateFactory.kt @@ -3,8 +3,7 @@ package io.airbyte.cdk.command import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.ConfigErrorException -import io.airbyte.cdk.StreamNamePair -import io.airbyte.cdk.asStreamNamePair +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.util.Jsons import io.airbyte.cdk.util.ResourceUtils import io.airbyte.protocol.models.v0.AirbyteGlobalState @@ -49,7 +48,7 @@ class InputStateFactory { if (msg.stream == null) { msg.type.toString() } else { - msg.stream.streamDescriptor.asStreamNamePair().toString() + StreamIdentifier.from(msg.stream.streamDescriptor).toString() } } .mapNotNull { (groupKey, groupValues) -> @@ -61,7 +60,7 @@ class InputStateFactory { } groupValues.last() } - val nonGlobalStreams: Map = + val nonGlobalStreams: Map = streamStates(deduped.mapNotNull { it.stream }) val globalState: AirbyteGlobalState? = deduped.find { it.type == AirbyteStateMessage.AirbyteStateType.GLOBAL }?.global @@ -73,18 +72,18 @@ class InputStateFactory { globalState.sharedState, OpaqueStateValue::class.java, ) - val globalStreams: Map = + val globalStreams: Map = streamStates(globalState.streamStates) return GlobalInputState(globalStateValue, globalStreams, nonGlobalStreams) } private fun streamStates( streamStates: List?, - ): Map = + ): Map = (streamStates ?: listOf()).associate { msg: AirbyteStreamState -> - val key: StreamNamePair = msg.streamDescriptor.asStreamNamePair() + val streamID: StreamIdentifier = StreamIdentifier.from(msg.streamDescriptor) val jsonValue: JsonNode = msg.streamState ?: Jsons.objectNode() - key to ValidatedJsonUtils.parseUnvalidated(jsonValue, OpaqueStateValue::class.java) + streamID to ValidatedJsonUtils.parseUnvalidated(jsonValue, OpaqueStateValue::class.java) } private fun validateStateMessage(message: AirbyteStateMessage) { diff --git a/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/command/InputStateTest.kt b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/command/InputStateTest.kt index ab1264ada1c6..2dc31ce6399a 100644 --- a/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/command/InputStateTest.kt +++ b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/command/InputStateTest.kt @@ -1,8 +1,9 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.command -import io.airbyte.cdk.StreamNamePair +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.util.Jsons +import io.airbyte.protocol.models.v0.StreamDescriptor import io.micronaut.context.annotation.Property import io.micronaut.test.extensions.junit5.annotation.MicronautTest import jakarta.inject.Inject @@ -27,9 +28,9 @@ class InputStateTest { val expected = StreamInputState( mapOf( - StreamNamePair("bar", "foo") to + streamID("foo", "bar") to Jsons.readTree("{\"primary_key\":{\"k1\":10,\"k2\":20}}"), - StreamNamePair("baz", "foo") to Jsons.readTree("{\"cursors\":{\"c\":30}}"), + streamID("foo", "baz") to Jsons.readTree("{\"cursors\":{\"c\":30}}"), ), ) Assertions.assertEquals( @@ -49,7 +50,7 @@ class InputStateTest { global = Jsons.readTree("{\"cdc\":{}}"), globalStreams = mapOf( - StreamNamePair("bar", "foo") to + streamID("foo", "bar") to Jsons.readTree("{\"primary_key\":{\"k1\":10,\"k2\":20}}"), ), nonGlobalStreams = mapOf(), @@ -71,13 +72,12 @@ class InputStateTest { global = Jsons.readTree("{\"cdc\":{}}"), globalStreams = mapOf( - StreamNamePair("bar", "foo") to + streamID("foo", "bar") to Jsons.readTree("{\"primary_key\":{\"k1\":10,\"k2\":20}}"), ), nonGlobalStreams = mapOf( - StreamNamePair("baz", "foo") to - Jsons.readTree("{\"primary_key\":{\"k\":1}}"), + streamID("foo", "baz") to Jsons.readTree("{\"primary_key\":{\"k\":1}}"), ), ) Assertions.assertEquals( @@ -94,13 +94,12 @@ class InputStateTest { global = Jsons.readTree("{\"cdc\":{}}"), globalStreams = mapOf( - StreamNamePair("bar", "foo") to + streamID("foo", "bar") to Jsons.readTree("{\"primary_key\":{\"k1\":10,\"k2\":20}}"), ), nonGlobalStreams = mapOf( - StreamNamePair("baz", "foo") to - Jsons.readTree("{\"primary_key\":{\"k\":10}}"), + streamID("foo", "baz") to Jsons.readTree("{\"primary_key\":{\"k\":10}}"), ), ) Assertions.assertEquals( @@ -108,4 +107,7 @@ class InputStateTest { Jsons.writeValueAsString(actual), ) } + + fun streamID(namespace: String, name: String): StreamIdentifier = + StreamIdentifier.from(StreamDescriptor().withName(name).withNamespace(namespace)) } diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt index 37b08b868881..046618f189d7 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt @@ -64,17 +64,19 @@ class CheckOperation( var n = 0 val namespaces: List = listOf(null) + metadataQuerier.streamNamespaces() for (namespace in namespaces) { - for (name in metadataQuerier.streamNames(namespace)) { + for (streamID in metadataQuerier.streamNames(namespace)) { try { - metadataQuerier.fields(name, namespace) + metadataQuerier.fields(streamID) } catch (e: Exception) { log.info(e) { - "Query failed on stream '$name' in '${namespace ?: ""}': ${e.message}" + "Query failed on stream '${streamID.name}' in '${namespace ?: ""}': ${e.message}" } n++ continue } - log.info { "Query successful on stream '$name' in '${namespace ?: ""}'." } + log.info { + "Query successful on stream '${streamID.name}' in '${namespace ?: ""}'." + } return } } diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamFactory.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamFactory.kt index 93705c4d2512..4dd2b568225a 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamFactory.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamFactory.kt @@ -17,8 +17,8 @@ interface AirbyteStreamFactory { fun createAirbyteStream(discoveredStream: DiscoveredStream): AirbyteStream = CatalogHelpers.createAirbyteStream( - discoveredStream.name, - discoveredStream.namespace, + discoveredStream.id.name, + discoveredStream.id.namespace, discoveredStream.columns.map { AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType()) }, diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt index 90f1732e6dd9..a0b34e5e3999 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt @@ -27,16 +27,16 @@ class DiscoverOperation( val namespaces: List = listOf(null) + metadataQuerier.streamNamespaces() for (namespace in namespaces) { - for (name in metadataQuerier.streamNames(namespace)) { - val fields: List = metadataQuerier.fields(name, namespace) + for (streamID in metadataQuerier.streamNames(namespace)) { + val fields: List = metadataQuerier.fields(streamID) if (fields.isEmpty()) { log.info { - "Ignoring stream '$name' in '${namespace ?: ""}' because no fields were discovered." + "Ignoring stream '${streamID.name}' in '${namespace ?: ""}' because no fields were discovered." } continue } - val primaryKey: List> = metadataQuerier.primaryKey(name, namespace) - val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKey) + val primaryKey: List> = metadataQuerier.primaryKey(streamID) + val discoveredStream = DiscoveredStream(streamID, fields, primaryKey) val airbyteStream: AirbyteStream = if (config.global) { airbyteStreamFactory.createGlobal(discoveredStream) diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoveredStream.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoveredStream.kt index 57453f29b29f..c932074ca245 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoveredStream.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoveredStream.kt @@ -4,9 +4,10 @@ package io.airbyte.cdk.discover +import io.airbyte.cdk.StreamIdentifier + data class DiscoveredStream( - val name: String, - val namespace: String?, + val id: StreamIdentifier, val columns: List, val primaryKeyColumnIDs: List>, ) diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt index 3fea5fddef38..875ea5b28a6a 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt @@ -1,6 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.discover +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.command.SourceConfiguration /** An abstraction for a catalog discovery session. */ @@ -10,19 +11,13 @@ interface MetadataQuerier : AutoCloseable { fun streamNamespaces(): List /** Returns all available stream names in the given namespace. */ - fun streamNames(streamNamespace: String?): List + fun streamNames(streamNamespace: String?): List /** Returns all available fields in the given stream. */ - fun fields( - streamName: String, - streamNamespace: String?, - ): List + fun fields(streamID: StreamIdentifier): List /** Returns the primary key for the given stream, if it exists; empty list otherwise. */ - fun primaryKey( - streamName: String, - streamNamespace: String?, - ): List> + fun primaryKey(streamID: StreamIdentifier): List> /** Executes extra checks which throw a [io.airbyte.cdk.ConfigErrorException] on failure. */ fun extraChecks() diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/CatalogValidationFailureHandler.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/CatalogValidationFailureHandler.kt index a7afc3328e8f..5771b07934d2 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/CatalogValidationFailureHandler.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/CatalogValidationFailureHandler.kt @@ -1,6 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.output +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.data.AirbyteType import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.DefaultImplementation @@ -19,59 +20,49 @@ interface CatalogValidationFailureHandler : Consumer /** Union type for all validation failures. */ sealed interface CatalogValidationFailure { - val streamName: String - val streamNamespace: String? + val streamID: StreamIdentifier } data class StreamNotFound( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, ) : CatalogValidationFailure data class MultipleStreamsFound( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, ) : CatalogValidationFailure data class StreamHasNoFields( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, ) : CatalogValidationFailure data class FieldNotFound( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, val fieldName: String, ) : CatalogValidationFailure data class FieldTypeMismatch( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, val fieldName: String, val expected: AirbyteType, val actual: AirbyteType, ) : CatalogValidationFailure data class InvalidPrimaryKey( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, val primaryKey: List, ) : CatalogValidationFailure data class InvalidCursor( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, val cursor: String, ) : CatalogValidationFailure data class InvalidIncrementalSyncMode( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, ) : CatalogValidationFailure data class ResetStream( - override val streamName: String, - override val streamNamespace: String?, + override val streamID: StreamIdentifier, ) : CatalogValidationFailure private val log = KotlinLogging.logger {} @@ -102,9 +93,9 @@ private class LoggingCatalogValidationFailureHandler : CatalogValidationFailureH } private fun CatalogValidationFailure.prettyName(): String = - if (streamNamespace == null) { - "'$streamName' in unspecified namespace" + if (streamID.namespace == null) { + "'${streamID.name}' in unspecified namespace" } else { - "'$streamName' in namespace '$streamNamespace'" + "'${streamID.name}' in namespace '${streamID.namespace}'" } } diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Feed.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Feed.kt index 6aaa39cc6957..396e9c7b5d49 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Feed.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Feed.kt @@ -1,7 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.read -import io.airbyte.cdk.StreamNamePair +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.discover.Field import io.airbyte.cdk.discover.FieldOrMetaField @@ -29,16 +29,18 @@ data class Global( * Roughly equivalent to a [io.airbyte.protocol.models.v0.ConfiguredAirbyteStream]. */ data class Stream( - val name: String, - val namespace: String?, + val id: StreamIdentifier, val fields: List, val configuredSyncMode: ConfiguredSyncMode, val configuredPrimaryKey: List?, val configuredCursor: FieldOrMetaField?, ) : Feed { - val namePair: StreamNamePair - get() = StreamNamePair(name, namespace) + val name: String + get() = id.name + + val namespace: String? + get() = id.namespace override val label: String - get() = namePair.toString() + get() = id.toString() } diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedReader.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedReader.kt index 2d623dd970d4..c5cfeaef102b 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedReader.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedReader.kt @@ -306,7 +306,7 @@ class FeedReader( if (feed is Stream) { root.outputConsumer.accept( AirbyteStreamStatusTraceMessage() - .withStreamDescriptor(feed.namePair.asProtocolStreamDescriptor()) + .withStreamDescriptor(feed.id.asProtocolStreamDescriptor()) .withStatus(status), ) } diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt index 09a820c865f1..fb311d55b2d3 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt @@ -1,7 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.read -import io.airbyte.cdk.StreamNamePair +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.asProtocolStreamDescriptor import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.protocol.models.v0.AirbyteGlobalState @@ -25,7 +25,7 @@ class StateManager( initialStreamStates: Map = mapOf(), ) : StateQuerier { private val global: GlobalStateManager? - private val nonGlobal: Map + private val nonGlobal: Map init { if (global == null) { @@ -33,7 +33,7 @@ class StateManager( nonGlobal = initialStreamStates .mapValues { NonGlobalStreamStateManager(it.key, it.value) } - .mapKeys { it.key.namePair } + .mapKeys { it.key.id } } else { val globalStreams: Map = global.streams.associateWith { initialStreamStates[it] } @@ -47,7 +47,7 @@ class StateManager( initialStreamStates .filterKeys { !globalStreams.containsKey(it) } .mapValues { NonGlobalStreamStateManager(it.key, it.value) } - .mapKeys { it.key.namePair } + .mapKeys { it.key.id } } } @@ -62,9 +62,8 @@ class StateManager( fun scoped(feed: Feed): StateManagerScopedToFeed = when (feed) { is Global -> global ?: throw IllegalArgumentException("unknown global key") - is Stream -> global?.streamStateManagers?.get(feed.namePair) - ?: nonGlobal[feed.namePair] - ?: throw IllegalArgumentException("unknown stream key") + is Stream -> global?.streamStateManagers?.get(feed.id) + ?: nonGlobal[feed.id] ?: throw IllegalArgumentException("unknown stream key") } interface StateManagerScopedToFeed { @@ -142,10 +141,10 @@ class StateManager( initialGlobalState: OpaqueStateValue?, initialStreamStates: Map, ) : BaseStateManager(global, initialGlobalState) { - val streamStateManagers: Map = + val streamStateManagers: Map = initialStreamStates .mapValues { GlobalStreamStateManager(it.key, it.value) } - .mapKeys { it.key.namePair } + .mapKeys { it.key.id } fun checkpoint(): AirbyteStateMessage? { var numSwapped = 0 @@ -166,10 +165,10 @@ class StateManager( streamStateValue = globalStreamSwapped.first totalNumRecords += globalStreamSwapped.second } - val namePair: StreamNamePair = streamStateManager.feed.namePair + val streamID: StreamIdentifier = streamStateManager.feed.id streamStates.add( AirbyteStreamState() - .withStreamDescriptor(namePair.asProtocolStreamDescriptor()) + .withStreamDescriptor(streamID.asProtocolStreamDescriptor()) .withStreamState(streamStateValue), ) } @@ -197,7 +196,7 @@ class StateManager( val (opaqueStateValue: OpaqueStateValue?, numRecords: Long) = swap() ?: return null val airbyteStreamState = AirbyteStreamState() - .withStreamDescriptor(feed.namePair.asProtocolStreamDescriptor()) + .withStreamDescriptor(feed.id.asProtocolStreamDescriptor()) .withStreamState(opaqueStateValue) return AirbyteStateMessage() .withType(AirbyteStateMessage.AirbyteStateType.STREAM) diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt index d8b5208524ae..c142c92d307a 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt @@ -3,7 +3,7 @@ package io.airbyte.cdk.read import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.ConfigErrorException -import io.airbyte.cdk.StreamNamePair +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.asProtocolStreamDescriptor import io.airbyte.cdk.command.EmptyInputState import io.airbyte.cdk.command.GlobalInputState @@ -82,10 +82,9 @@ class StateManagerFactory( initialStreamStates = streams.associateWith { stream: Stream -> when (stream.configuredSyncMode) { - ConfiguredSyncMode.INCREMENTAL -> - inputState?.globalStreams?.get(stream.namePair) + ConfiguredSyncMode.INCREMENTAL -> inputState?.globalStreams?.get(stream.id) ConfiguredSyncMode.FULL_REFRESH -> - inputState?.nonGlobalStreams?.get(stream.namePair) + inputState?.nonGlobalStreams?.get(stream.id) } }, ) @@ -96,9 +95,7 @@ class StateManagerFactory( ) = StateManager( initialStreamStates = - streams.associateWith { stream: Stream -> - inputState?.streams?.get(stream.namePair) - }, + streams.associateWith { stream: Stream -> inputState?.streams?.get(stream.id) }, ) private fun toStream( @@ -107,16 +104,16 @@ class StateManagerFactory( ): Stream? { val stream: AirbyteStream = configuredStream.stream val jsonSchemaProperties: JsonNode = stream.jsonSchema["properties"] - val name: String = stream.name!! - val namespace: String? = stream.namespace - val streamNamePair = StreamNamePair(name, namespace) - val streamLabel: String = streamNamePair.toString() - when (metadataQuerier.streamNames(namespace).filter { it == name }.size) { + val streamID: StreamIdentifier = StreamIdentifier.from(configuredStream.stream) + val name: String = streamID.name + val namespace: String? = streamID.namespace + val streamLabel: String = streamID.toString() + when (metadataQuerier.streamNames(namespace).filter { it.name == name }.size) { 0 -> { - handler.accept(StreamNotFound(name, namespace)) + handler.accept(StreamNotFound(streamID)) outputConsumer.accept( AirbyteErrorTraceMessage() - .withStreamDescriptor(streamNamePair.asProtocolStreamDescriptor()) + .withStreamDescriptor(streamID.asProtocolStreamDescriptor()) .withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR) .withMessage("Stream '$streamLabel' not found or not accessible in source.") ) @@ -124,10 +121,10 @@ class StateManagerFactory( } 1 -> Unit else -> { - handler.accept(MultipleStreamsFound(name, namespace)) + handler.accept(MultipleStreamsFound(streamID)) outputConsumer.accept( AirbyteErrorTraceMessage() - .withStreamDescriptor(streamNamePair.asProtocolStreamDescriptor()) + .withStreamDescriptor(streamID.asProtocolStreamDescriptor()) .withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR) .withMessage("Multiple streams '$streamLabel' found in source.") ) @@ -140,7 +137,7 @@ class StateManagerFactory( id to airbyteTypeFromJsonSchema(schema) } val actualDataColumns: Map = - metadataQuerier.fields(name, namespace).associateBy { it.id } + metadataQuerier.fields(streamID).associateBy { it.id } fun dataColumnOrNull(id: String): Field? { if (MetaField.isMetaFieldID(id)) { @@ -150,7 +147,7 @@ class StateManagerFactory( } val actualColumn: Field? = actualDataColumns[id] if (actualColumn == null) { - handler.accept(FieldNotFound(name, namespace, id)) + handler.accept(FieldNotFound(streamID, id)) return null } val expectedAirbyteType: AirbyteType = expectedSchema[id] ?: return null @@ -158,8 +155,7 @@ class StateManagerFactory( if (expectedAirbyteType != actualAirbyteType) { handler.accept( FieldTypeMismatch( - name, - namespace, + streamID, id, expectedAirbyteType, actualAirbyteType, @@ -174,10 +170,10 @@ class StateManagerFactory( dataColumnOrNull(it) ?: return@toStream null } if (streamFields.isEmpty()) { - handler.accept(StreamHasNoFields(name, namespace)) + handler.accept(StreamHasNoFields(streamID)) outputConsumer.accept( AirbyteErrorTraceMessage() - .withStreamDescriptor(streamNamePair.asProtocolStreamDescriptor()) + .withStreamDescriptor(streamID.asProtocolStreamDescriptor()) .withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR) .withMessage("Stream '$streamLabel' has no accessible fields.") ) @@ -192,7 +188,7 @@ class StateManagerFactory( pkColumnIDComponents.map { it.joinToString(separator = ".") } val pk: List = pkColumnIDs.mapNotNull(::dataColumnOrNull) if (pk.size < pkColumnIDComponents.size) { - handler.accept(InvalidPrimaryKey(name, namespace, pkColumnIDs)) + handler.accept(InvalidPrimaryKey(streamID, pkColumnIDs)) return null } return pk @@ -216,7 +212,7 @@ class StateManagerFactory( when (configuredStream.syncMode) { SyncMode.INCREMENTAL -> if (configuredCursor == null) { - handler.accept(InvalidIncrementalSyncMode(name, namespace)) + handler.accept(InvalidIncrementalSyncMode(streamID)) ConfiguredSyncMode.FULL_REFRESH } else { ConfiguredSyncMode.INCREMENTAL @@ -224,8 +220,7 @@ class StateManagerFactory( else -> ConfiguredSyncMode.FULL_REFRESH } return Stream( - name, - namespace, + streamID, streamFields, configuredSyncMode, configuredPrimaryKey, diff --git a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/RootReaderIntegrationTest.kt b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/RootReaderIntegrationTest.kt index c3d03ea8a852..41fc41e8d8d3 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/RootReaderIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/RootReaderIntegrationTest.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.read import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ArrayNode import io.airbyte.cdk.ClockFactory +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.output.BufferingOutputConsumer import io.airbyte.cdk.util.Jsons @@ -11,6 +12,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage import io.airbyte.protocol.models.v0.AirbyteTraceMessage +import io.airbyte.protocol.models.v0.StreamDescriptor import io.github.oshai.kotlinlogging.KotlinLogging import java.lang.RuntimeException import java.time.Duration @@ -206,8 +208,7 @@ data class TestCase( val stream: Stream = Stream( - name = name, - namespace = "test", + id = StreamIdentifier.from(StreamDescriptor().withName(name).withNamespace("test")), fields = listOf(), configuredSyncMode = ConfiguredSyncMode.FULL_REFRESH, configuredPrimaryKey = null, diff --git a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt index c43b15e0542b..5a2abd651c27 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt +++ b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt @@ -205,7 +205,7 @@ class StateManagerGlobalStatesTest { Assertions.assertEquals(listOf("V", "K"), kv.fields.map { it.id }) Assertions.assertEquals(listOf("K"), kv.configuredPrimaryKey?.map { it.id }) Assertions.assertEquals(ConfiguredSyncMode.INCREMENTAL, kv.configuredSyncMode) - val events: Stream = streams.filter { it.namePair != kv.namePair }.first() + val events: Stream = streams.filter { it.id != kv.id }.first() Assertions.assertEquals("EVENTS", events.name) Assertions.assertEquals(listOf("MSG", "ID", "TS"), events.fields.map { it.id }) Assertions.assertEquals(listOf("ID"), events.configuredPrimaryKey?.map { it.id }) diff --git a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerStreamStatesTest.kt b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerStreamStatesTest.kt index ae1c7730acd1..5cd9465d7b7a 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerStreamStatesTest.kt +++ b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerStreamStatesTest.kt @@ -2,6 +2,7 @@ package io.airbyte.cdk.read import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.command.InputState import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.cdk.output.BufferingCatalogValidationFailureHandler @@ -11,6 +12,7 @@ import io.airbyte.cdk.output.StreamNotFound import io.airbyte.cdk.util.Jsons import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.airbyte.protocol.models.v0.StreamDescriptor import io.micronaut.context.annotation.Property import io.micronaut.test.extensions.junit5.annotation.MicronautTest import jakarta.inject.Inject @@ -58,7 +60,7 @@ class StateManagerStreamStatesTest { fun testBadStreamName() { // test current state Assertions.assertEquals(listOf(), stateManager.feeds) - Assertions.assertEquals(listOf(StreamNotFound("BLAH", "PUBLIC")), handler.get()) + Assertions.assertEquals(listOf(StreamNotFound(streamID("BLAH"))), handler.get()) } @Test @@ -82,7 +84,10 @@ class StateManagerStreamStatesTest { fun testBadSchema() { // test current state Assertions.assertEquals(listOf(), stateManager.feeds) - Assertions.assertEquals(listOf(StreamHasNoFields("EVENTS", "PUBLIC")), handler.get()) + Assertions.assertEquals( + listOf(StreamHasNoFields(streamID("EVENTS"))), + handler.get(), + ) } @Test @@ -193,6 +198,9 @@ class StateManagerStreamStatesTest { return eventsStream } + private fun streamID(name: String): StreamIdentifier = + StreamIdentifier.from(StreamDescriptor().withName(name).withNamespace("PUBLIC")) + companion object { const val STREAM = """ diff --git a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt index 35556e67f9b9..3c7074a9b479 100644 --- a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt +++ b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt @@ -1,9 +1,11 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.discover +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.cdk.command.ValidatedJsonUtils import io.airbyte.cdk.util.ResourceUtils +import io.airbyte.protocol.models.v0.StreamDescriptor import io.micronaut.context.annotation.Replaces import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Value @@ -20,14 +22,16 @@ import java.sql.SQLException class ResourceDrivenMetadataQuerierFactory( @Value("\${metadata.resource}") resource: String? = null, ) : MetadataQuerier.Factory { - val metadata: Map> + val metadata: Map init { val json: String? = resource?.let { ResourceUtils.readResource(it) } val level0: List = ValidatedJsonUtils.parseList(Level1::class.java, json) - metadata = level0.map { it.namespace }.distinct().associateWith { mutableMapOf() } + val map = mutableMapOf() for (level1 in level0) { - metadata[level1.namespace]!![level1.name] = null + val desc = StreamDescriptor().withName(level1.name).withNamespace(level1.namespace) + val streamID: StreamIdentifier = StreamIdentifier.from(desc) + map[streamID] = null val level2: Level2 = level1.metadata ?: continue val columns: List = level2.columns.map { (id: String, fullyQualifiedClassName: String) -> @@ -35,9 +39,9 @@ class ResourceDrivenMetadataQuerierFactory( Class.forName(fullyQualifiedClassName).kotlin.objectInstance as FieldType Field(id, fieldType) } - metadata[level1.namespace]!![level1.name] = - TestStreamMetadata(columns, level2.primaryKeys) + map[streamID] = TestStreamMetadata(columns, level2.primaryKeys) } + metadata = map } override fun session(config: SourceConfiguration): MetadataQuerier = @@ -46,30 +50,26 @@ class ResourceDrivenMetadataQuerierFactory( override fun streamNamespaces(): List { if (isClosed) throw IllegalStateException() - return metadata.keys.filterNotNull() + return metadata.keys.mapNotNull { it.namespace }.distinct() } - override fun streamNames(streamNamespace: String?): List { + override fun streamNames(streamNamespace: String?): List { if (isClosed) throw IllegalStateException() - return metadata[streamNamespace]?.keys?.toList() ?: listOf() + return metadata.keys.filter { it.namespace == streamNamespace } } override fun fields( - streamName: String, - streamNamespace: String?, + streamID: StreamIdentifier, ): List { if (isClosed) throw IllegalStateException() - return metadata[streamNamespace]?.get(streamName)?.fields - ?: throw SQLException("query failed", "tbl") + return metadata[streamID]?.fields ?: throw SQLException("query failed", "tbl") } override fun primaryKey( - streamName: String, - streamNamespace: String?, + streamID: StreamIdentifier, ): List> { if (isClosed) throw IllegalStateException() - return metadata[streamNamespace]?.get(streamName)?.primaryKeys - ?: throw SQLException("query failed", "tbl") + return metadata[streamID]?.primaryKeys ?: throw SQLException("query failed", "tbl") } override fun extraChecks() {} diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt index 6b42ab04e296..9340c86e31f2 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt @@ -1,7 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.discover -import io.airbyte.cdk.StreamNamePair +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.check.JdbcCheckQueries import io.airbyte.cdk.command.JdbcSourceConfiguration import io.airbyte.cdk.jdbc.DefaultJdbcConstants @@ -14,6 +14,7 @@ import io.airbyte.cdk.read.SelectColumns import io.airbyte.cdk.read.SelectQueryGenerator import io.airbyte.cdk.read.SelectQuerySpec import io.airbyte.cdk.read.optimize +import io.airbyte.protocol.models.v0.StreamDescriptor import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.inject.Singleton import java.sql.Connection @@ -46,8 +47,11 @@ class JdbcMetadataQuerier( override fun streamNamespaces(): List = memoizedTableNames.mapNotNull { it.namespace() }.distinct() - override fun streamNames(streamNamespace: String?): List = - memoizedTableNames.filter { it.namespace() == streamNamespace }.map { it.name } + override fun streamNames(streamNamespace: String?): List = + memoizedTableNames + .filter { it.namespace() == streamNamespace } + .map { StreamDescriptor().withName(it.name).withNamespace(it.namespace()) } + .map(StreamIdentifier::from) fun swallow(supplier: () -> T): T? { try { @@ -90,11 +94,8 @@ class JdbcMetadataQuerier( } } - fun findTableName( - streamName: String, - streamNamespace: String?, - ): TableName? = - memoizedTableNames.find { it.name == streamName && it.namespace() == streamNamespace } + fun findTableName(streamID: StreamIdentifier): TableName? = + memoizedTableNames.find { it.name == streamID.name && it.namespace() == streamID.namespace } val memoizedColumnMetadata: Map> by lazy { val joinMap: Map = @@ -200,10 +201,9 @@ class JdbcMetadataQuerier( } override fun fields( - streamName: String, - streamNamespace: String?, + streamID: StreamIdentifier, ): List { - val table: TableName = findTableName(streamName, streamNamespace) ?: return listOf() + val table: TableName = findTableName(streamID) ?: return listOf() return columnMetadata(table).map { Field(it.label, fieldTypeMapper.toFieldType(it)) } } @@ -285,15 +285,13 @@ class JdbcMetadataQuerier( val memoizedPrimaryKeys = mutableMapOf>>() override fun primaryKey( - streamName: String, - streamNamespace: String?, + streamID: StreamIdentifier, ): List> { - val table: TableName = findTableName(streamName, streamNamespace) ?: return listOf() + val table: TableName = findTableName(streamID) ?: return listOf() val memoized: List>? = memoizedPrimaryKeys[table] if (memoized != null) return memoized val results = mutableListOf() - val streamPair = StreamNamePair(streamName, streamNamespace) - log.info { "Querying primary keys in '$streamPair' for catalog discovery." } + log.info { "Querying primary keys in '$streamID' for catalog discovery." } try { val dbmd: DatabaseMetaData = conn.metaData dbmd.getPrimaryKeys(table.catalog, table.schema, table.name).use { rs: ResultSet -> @@ -307,7 +305,7 @@ class JdbcMetadataQuerier( ) } } - log.info { "Discovered all primary keys in '$streamPair'." } + log.info { "Discovered all primary keys in '$streamID'." } } catch (e: Exception) { throw RuntimeException("Primary key discovery query failed: ${e.message}", e) } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactory.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactory.kt index 682072327c60..c2883b507078 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactory.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactory.kt @@ -49,7 +49,7 @@ class DefaultJdbcPartitionFactory( val pkMap: Map = sv.pkMap(stream) ?: run { - handler.accept(ResetStream(stream.name, stream.namespace)) + handler.accept(ResetStream(stream.id)) streamState.reset() return coldStart(streamState) } @@ -59,7 +59,7 @@ class DefaultJdbcPartitionFactory( } else { sv.cursorPair(stream) ?: run { - handler.accept(ResetStream(stream.name, stream.namespace)) + handler.accept(ResetStream(stream.id)) streamState.reset() return coldStart(streamState) } @@ -70,7 +70,7 @@ class DefaultJdbcPartitionFactory( return if (cursorPair == null) { if (isCursorBasedIncremental) { - handler.accept(ResetStream(stream.name, stream.namespace)) + handler.accept(ResetStream(stream.id)) streamState.reset() coldStart(streamState) } else if (pkMap.isEmpty()) { @@ -89,7 +89,7 @@ class DefaultJdbcPartitionFactory( } else { val (cursor: Field, cursorCheckpoint: JsonNode) = cursorPair if (!isCursorBasedIncremental) { - handler.accept(ResetStream(stream.name, stream.namespace)) + handler.accept(ResetStream(stream.id)) streamState.reset() coldStart(streamState) } else if (pkMap.isNotEmpty()) { @@ -127,7 +127,7 @@ class DefaultJdbcPartitionFactory( val fields: List = stream.configuredPrimaryKey ?: listOf() if (primaryKey.keys != fields.map { it.id }.toSet()) { handler.accept( - InvalidPrimaryKey(stream.name, stream.namespace, primaryKey.keys.toList()), + InvalidPrimaryKey(stream.id, primaryKey.keys.toList()), ) return null } @@ -137,7 +137,7 @@ class DefaultJdbcPartitionFactory( private fun DefaultJdbcStreamStateValue.cursorPair(stream: Stream): Pair? { if (cursors.size > 1) { handler.accept( - InvalidCursor(stream.name, stream.namespace, cursors.keys.toString()), + InvalidCursor(stream.id, cursors.keys.toString()), ) return null } @@ -145,13 +145,13 @@ class DefaultJdbcPartitionFactory( val cursor: FieldOrMetaField? = stream.fields.find { it.id == cursorLabel } if (cursor !is Field) { handler.accept( - InvalidCursor(stream.name, stream.namespace, cursorLabel), + InvalidCursor(stream.id, cursorLabel), ) return null } if (stream.configuredCursor != cursor) { handler.accept( - InvalidCursor(stream.name, stream.namespace, cursorLabel), + InvalidCursor(stream.id, cursorLabel), ) return null } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt index 93e9c94a85c0..92dd530e0ff5 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt @@ -1,6 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.discover +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.check.JdbcCheckQueries import io.airbyte.cdk.h2.H2TestFixture import io.airbyte.cdk.h2source.H2SourceConfiguration @@ -8,6 +9,7 @@ import io.airbyte.cdk.h2source.H2SourceConfigurationFactory import io.airbyte.cdk.h2source.H2SourceConfigurationJsonObject import io.airbyte.cdk.h2source.H2SourceOperations import io.airbyte.cdk.jdbc.DefaultJdbcConstants +import io.airbyte.protocol.models.v0.StreamDescriptor import java.sql.JDBCType import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @@ -37,7 +39,10 @@ class JdbcMetadataQuerierTest { val config: H2SourceConfiguration = H2SourceConfigurationFactory().make(configPojo) factory.session(config).use { mdq: MetadataQuerier -> Assertions.assertEquals(listOf("PUBLIC"), mdq.streamNamespaces()) - Assertions.assertEquals(listOf("KV"), mdq.streamNames("PUBLIC")) + Assertions.assertEquals( + listOf("PUBLIC.KV"), + mdq.streamNames("PUBLIC").map { it.toString() }, + ) val expectedColumnMetadata: List = listOf( JdbcMetadataQuerier.ColumnMetadata( @@ -77,10 +82,12 @@ class JdbcMetadataQuerierTest { nullable = true, ), ) - val tableName = (mdq as JdbcMetadataQuerier).findTableName("KV", "PUBLIC") + val desc = StreamDescriptor().withNamespace("PUBLIC").withName("KV") + val streamID: StreamIdentifier = StreamIdentifier.from(desc) + val tableName = (mdq as JdbcMetadataQuerier).findTableName(streamID) Assertions.assertNotNull(tableName) Assertions.assertEquals(expectedColumnMetadata, mdq.columnMetadata(tableName!!)) - Assertions.assertEquals(listOf(listOf("K")), mdq.primaryKey("KV", "PUBLIC")) + Assertions.assertEquals(listOf(listOf("K")), mdq.primaryKey(streamID)) } } } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactoryTest.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactoryTest.kt index 10d2e667d624..46d1f1d14a5e 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactoryTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactoryTest.kt @@ -241,8 +241,8 @@ class DefaultJdbcPartitionFactoryTest { val factory = sharedState().factory() val result = factory.create(stream, opaqueStateValue(pk = 22)) factory.assertFailures( - InvalidPrimaryKey(stream.name, stream.namespace, listOf(id.id)), - ResetStream(stream.name, stream.namespace), + InvalidPrimaryKey(stream.id, listOf(id.id)), + ResetStream(stream.id), ) Assertions.assertTrue(result is DefaultJdbcUnsplittableSnapshotPartition) val partition = result as DefaultJdbcUnsplittableSnapshotPartition @@ -256,8 +256,8 @@ class DefaultJdbcPartitionFactoryTest { val factory = sharedState().factory() val result = factory.create(stream, opaqueStateValue(cursor = cursorValue)) factory.assertFailures( - InvalidCursor(stream.name, stream.namespace, ts.id), - ResetStream(stream.name, stream.namespace), + InvalidCursor(stream.id, ts.id), + ResetStream(stream.id), ) Assertions.assertTrue(result is DefaultJdbcSplittableSnapshotPartition) val partition = result as DefaultJdbcSplittableSnapshotPartition diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt index 014785e3a76b..fd1939c756ca 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt @@ -7,6 +7,7 @@ package io.airbyte.cdk.read import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.ClockFactory +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.command.JdbcSourceConfiguration import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.discover.Field @@ -20,6 +21,7 @@ import io.airbyte.cdk.output.CatalogValidationFailure import io.airbyte.cdk.ssh.SshConnectionOptions import io.airbyte.cdk.ssh.SshTunnelMethodConfiguration import io.airbyte.cdk.util.Jsons +import io.airbyte.protocol.models.v0.StreamDescriptor import java.time.Duration import java.time.LocalDate import org.junit.jupiter.api.Assertions @@ -35,8 +37,7 @@ object TestFixtures { withCursor: Boolean = true, ) = Stream( - name = "events", - namespace = "test", + id = StreamIdentifier.from(StreamDescriptor().withNamespace("test").withName("events")), fields = listOf(id, ts, msg), configuredSyncMode = if (withCursor) ConfiguredSyncMode.INCREMENTAL else ConfiguredSyncMode.FULL_REFRESH, diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-bad-catalog.json b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-bad-catalog.json index 7a887e33eee0..b962d18eef84 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-bad-catalog.json +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-bad-catalog.json @@ -3,7 +3,7 @@ "type": "LOG", "log": { "level": "WARN", - "message": "StreamNotFound(streamName=FOO, streamNamespace=PUBLIC)" + "message": "StreamNotFound(streamID=PUBLIC.FOO)" } }, { @@ -16,7 +16,7 @@ "name": "FOO", "namespace": "PUBLIC" }, - "message": "Stream 'PUBLIC_FOO' not found or not accessible in source.", + "message": "Stream 'PUBLIC.FOO' not found or not accessible in source.", "failure_type": "config_error" } } diff --git a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml index 389cae03ccdd..48ca33541b51 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c - dockerImageTag: 0.0.4 + dockerImageTag: 0.0.5 dockerRepository: airbyte/source-mysql-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql-v2 diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt index 0e3cd9d231ee..b4792ee35b48 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt @@ -1,6 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.source.mysql +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.check.JdbcCheckQueries import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.cdk.discover.Field @@ -10,6 +11,7 @@ import io.airbyte.cdk.discover.TableName import io.airbyte.cdk.jdbc.DefaultJdbcConstants import io.airbyte.cdk.jdbc.JdbcConnectionFactory import io.airbyte.cdk.read.SelectQueryGenerator +import io.airbyte.protocol.models.v0.StreamDescriptor import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Primary import jakarta.inject.Singleton @@ -23,11 +25,8 @@ class MysqlSourceMetadataQuerier( val base: JdbcMetadataQuerier, ) : MetadataQuerier by base { - override fun fields( - streamName: String, - streamNamespace: String?, - ): List { - val table: TableName = findTableName(streamName, streamNamespace) ?: return listOf() + override fun fields(streamID: StreamIdentifier): List { + val table: TableName = findTableName(streamID) ?: return listOf() if (table !in base.memoizedColumnMetadata) return listOf() return base.memoizedColumnMetadata[table]!!.map { Field(it.label, base.fieldTypeMapper.toFieldType(it)) @@ -36,17 +35,17 @@ class MysqlSourceMetadataQuerier( override fun streamNamespaces(): List = base.config.namespaces.toList() - override fun streamNames(streamNamespace: String?): List = + override fun streamNames(streamNamespace: String?): List = base.memoizedTableNames .filter { (it.schema ?: it.catalog) == streamNamespace } - .map { it.name } + .map { StreamDescriptor().withName(it.name).withNamespace(streamNamespace) } + .map(StreamIdentifier::from) fun findTableName( - streamName: String, - streamNamespace: String?, + streamID: StreamIdentifier, ): TableName? = base.memoizedTableNames.find { - it.name == streamName && (it.schema ?: it.catalog) == streamNamespace + it.name == streamID.name && (it.schema ?: it.catalog) == streamID.namespace } val memoizedPrimaryKeys: Map>> by lazy { @@ -73,7 +72,13 @@ class MysqlSourceMetadataQuerier( } log.info { "Discovered all primary keys in ${schemas.size} Mysql schema(s)." } return@lazy results - .groupBy { findTableName(it.tableName, "public") } + .groupBy { + findTableName( + StreamIdentifier.from( + StreamDescriptor().withName(it.tableName).withNamespace("public") + ) + ) + } .mapNotNull { (table, rowsByTable) -> if (table == null) return@mapNotNull null val pkRows: List = @@ -99,10 +104,9 @@ class MysqlSourceMetadataQuerier( } override fun primaryKey( - streamName: String, - streamNamespace: String?, + streamID: StreamIdentifier, ): List> { - val table: TableName = findTableName(streamName, streamNamespace) ?: return listOf() + val table: TableName = findTableName(streamID) ?: return listOf() return memoizedPrimaryKeys[table] ?: listOf() }