Skip to content

Commit

Permalink
source-mysql-v2: downstream changes
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar committed Sep 19, 2024
1 parent beb4a47 commit 18ba731
Showing 1 changed file with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.check.JdbcCheckQueries
import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.discover.Field
Expand All @@ -10,6 +11,7 @@ import io.airbyte.cdk.discover.TableName
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.read.SelectQueryGenerator
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
Expand All @@ -23,11 +25,8 @@ class MysqlSourceMetadataQuerier(
val base: JdbcMetadataQuerier,
) : MetadataQuerier by base {

override fun fields(
streamName: String,
streamNamespace: String?,
): List<Field> {
val table: TableName = findTableName(streamName, streamNamespace) ?: return listOf()
override fun fields(streamID: StreamIdentifier): List<Field> {
val table: TableName = findTableName(streamID) ?: return listOf()
if (table !in base.memoizedColumnMetadata) return listOf()
return base.memoizedColumnMetadata[table]!!.map {
Field(it.label, base.fieldTypeMapper.toFieldType(it))
Expand All @@ -36,17 +35,17 @@ class MysqlSourceMetadataQuerier(

override fun streamNamespaces(): List<String> = base.config.namespaces.toList()

override fun streamNames(streamNamespace: String?): List<String> =
override fun streamNames(streamNamespace: String?): List<StreamIdentifier> =
base.memoizedTableNames
.filter { (it.schema ?: it.catalog) == streamNamespace }
.map { it.name }
.map { StreamDescriptor().withName(it.name).withNamespace(streamNamespace) }
.map(StreamIdentifier::from)

fun findTableName(
streamName: String,
streamNamespace: String?,
streamID: StreamIdentifier,
): TableName? =
base.memoizedTableNames.find {
it.name == streamName && (it.schema ?: it.catalog) == streamNamespace
it.name == streamID.name && (it.schema ?: it.catalog) == streamID.namespace
}

val memoizedPrimaryKeys: Map<TableName, List<List<String>>> by lazy {
Expand All @@ -73,7 +72,13 @@ class MysqlSourceMetadataQuerier(
}
log.info { "Discovered all primary keys in ${schemas.size} Mysql schema(s)." }
return@lazy results
.groupBy { findTableName(it.tableName, "public") }
.groupBy {
findTableName(
StreamIdentifier.from(
StreamDescriptor().withName(it.tableName).withNamespace("public")
)
)
}
.mapNotNull { (table, rowsByTable) ->
if (table == null) return@mapNotNull null
val pkRows: List<AllPrimaryKeysRow> =
Expand All @@ -99,10 +104,9 @@ class MysqlSourceMetadataQuerier(
}

override fun primaryKey(
streamName: String,
streamNamespace: String?,
streamID: StreamIdentifier,
): List<List<String>> {
val table: TableName = findTableName(streamName, streamNamespace) ?: return listOf()
val table: TableName = findTableName(streamID) ?: return listOf()
return memoizedPrimaryKeys[table] ?: listOf()
}

Expand Down

0 comments on commit 18ba731

Please sign in to comment.