Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reducing surface area and addressing Static analysis problems #19011

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
com.azure.cosmos.spark.CosmosItemsDataSource
com.azure.cosmos.spark.CosmosChangeFeedItemsDataSource
com.azure.cosmos.spark.CosmosChangeFeedDataSource
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package com.azure.cosmos.implementation
import com.azure.cosmos.CosmosClientBuilder
import com.azure.cosmos.implementation.ImplementationBridgeHelpers.CosmosClientBuilderHelper

object SparkBridgeInternal {
def setMetadataCacheSnapshot(cosmosClientBuilder: CosmosClientBuilder, metadataCache: CosmosClientMetadataCachesSnapshot): Unit = {
private[cosmos] object SparkBridgeInternal {
def setMetadataCacheSnapshot(cosmosClientBuilder: CosmosClientBuilder,
metadataCache: CosmosClientMetadataCachesSnapshot): Unit = {

val clientBuilderAccessor = CosmosClientBuilderHelper.getCosmosClientBuilderAccessor()
clientBuilderAccessor.setCosmosClientMetadataCachesSnapshot(cosmosClientBuilder, metadataCache)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.models

// scalastyle:off underscore.import
import scala.collection.JavaConverters._
// scalastyle:on underscore.import

// SqlQuerySpec is not serializable we need a serializable wrapper
case class CosmosParametrizedQuery(queryTest: String,
parameterNames: List[String],
parameterValues: List[Any])
private[cosmos] case class CosmosParametrizedQuery(queryTest: String,
parameterNames: List[String],
parameterValues: List[Any])
extends Serializable {
def toSqlQuerySpec(): SqlQuerySpec = {
def toSqlQuerySpec: SqlQuerySpec = {
new SqlQuerySpec(queryTest, parameterNames.zip(parameterValues)
.map(param => new SqlParameter(param._1, param._2))
.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import org.apache.spark.sql.sources.Filter
// then we can set partitionKeyValue in the CosmosQueryOption
// the benefit is that if the partitionKeyValue is set in the CosmosQueryOption
// the antlr query parsing support can eliminate the need for query plan fetch from GW
case class AnalyzedFilters(cosmosParametrizedQuery: CosmosParametrizedQuery,
filtersToBePushedDownToCosmos: Array[Filter],
filtersNotSupportedByCosmos: Array[Filter])
// partitionKeyValue would also be the only filter I would consider as an option for
// pushing down filters to change feed
private case class AnalyzedFilters(cosmosParametrizedQuery: CosmosParametrizedQuery,
filtersToBePushedDownToCosmos: Array[Filter],
filtersNotSupportedByCosmos: Array[Filter])

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory}
import org.apache.spark.sql.types.StructType

private class CosmosChangeFeedBatch
private class ChangeFeedBatch
(
schema: StructType,
config: Map[String, String],
Expand All @@ -21,11 +21,10 @@ private class CosmosChangeFeedBatch
override def planInputPartitions(): Array[InputPartition] = {
// TODO: moderakh use get feed range?
// for now we are returning one partition hence only one spark task will be created.
Array(CosmosInputPartition(FeedRange.forFullRange.toString()))
Array(FeedRangeInputPartition(FeedRange.forFullRange.toString))
}

override def createReaderFactory(): PartitionReaderFactory = {
CosmosChangeFeedScanPartitionReaderFactory(config, schema, cosmosClientStateHandle)
ChangeFeedScanPartitionReaderFactory(config, schema, cosmosClientStateHandle)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.spark.sql.types.StructType
// per spark task there will be one CosmosPartitionReader.
// This provides iterator to read from the assigned spark partition
// For now we are creating only one spark partition per physical partition
private case class CosmosChangeFeedPartitionReader
private case class ChangeFeedPartitionReader
(
config: Map[String, String],
readSchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.connector.read.{Batch, Scan}
import org.apache.spark.sql.types.StructType

private case class CosmosChangeFeedScan
private case class ChangeFeedScan
(
schema: StructType,
config: Map[String, String],
Expand Down Expand Up @@ -39,6 +39,6 @@ private case class CosmosChangeFeedScan
* `TableCapability.BATCH_READ`
*/
override def toBatch: Batch = {
new CosmosChangeFeedBatch(schema, config, cosmosClientStateHandle)
new ChangeFeedBatch(schema, config, cosmosClientStateHandle)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import scala.collection.JavaConverters._
// scalastyle:on underscore.import

private case class CosmosChangeFeedScanBuilder
private case class ChangeFeedScanBuilder
(
config: CaseInsensitiveStringMap,
inputSchema: StructType,
Expand Down Expand Up @@ -50,7 +50,7 @@ private case class CosmosChangeFeedScanBuilder
}

override def build(): Scan = {
CosmosChangeFeedScan(
ChangeFeedScan(
inputSchema,
config.asScala.toMap,
cosmosClientStateHandle)
Expand All @@ -63,7 +63,7 @@ private case class CosmosChangeFeedScanBuilder
* also OK to do the pruning partially, e.g., a data source may not be able to prune nested
* fields, and only prune top-level columns.
*
* Note that, {@link Scan# readSchema ( )} implementation should take care of the column
* Note that, `Scan` implementation should take care of the column
* pruning applied here.
*/
override def pruneColumns(requiredSchema: StructType): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.types.StructType

private case class CosmosChangeFeedScanPartitionReaderFactory
private case class ChangeFeedScanPartitionReaderFactory
(
config: Map[String, String],
readSchema: StructType,
Expand All @@ -19,8 +19,6 @@ private case class CosmosChangeFeedScanPartitionReaderFactory
logTrace(s"Instantiated ${this.getClass.getSimpleName}")

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
CosmosChangeFeedPartitionReader(config,
readSchema,
cosmosClientStateHandle)
ChangeFeedPartitionReader(config, readSchema, cosmosClientStateHandle)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import java.util
import java.util.UUID

// scalastyle:off underscore.import
import com.azure.cosmos.spark.CosmosTableSchemaInferer._
import com.azure.cosmos.spark.CosmosTableSchemaInferrer._
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._
// scalastyle:on underscore.import

private[spark] object CosmosChangeFeedTable {
private[spark] object ChangeFeedTable {

private[spark] val defaultIncrementalChangeFeedSchemaForInferenceDisabled = StructType(Seq(
StructField(RawJsonBodyAttributeName, StringType),
Expand All @@ -44,15 +44,15 @@ private[spark] object CosmosChangeFeedTable {
}

/**
* CosmosChangeFeedTable is the entry point for the change feed data source - this is registered in the spark
* ChangeFeedTable is the entry point for the change feed data source - this is registered in the spark
*
* @param transforms The specified table partitioning.
* @param userConfig The effective user configuration
* @param userProvidedSchema The user provided schema - can be null/none
*/
private class CosmosChangeFeedTable(val transforms: Array[Transform],
val userConfig: util.Map[String, String],
val userProvidedSchema: Option[StructType] = None)
private class ChangeFeedTable(val transforms: Array[Transform],
val userConfig: util.Map[String, String],
val userProvidedSchema: Option[StructType] = None)
extends Table
with SupportsRead
with CosmosLoggingTrait {
Expand All @@ -78,7 +78,7 @@ private class CosmosChangeFeedTable(val transforms: Array[Transform],
TableCapability.BATCH_READ).asJava

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
CosmosChangeFeedScanBuilder(new CaseInsensitiveStringMap(
ChangeFeedScanBuilder(new CaseInsensitiveStringMap(
CosmosConfig.getEffectiveConfig(options.asCaseSensitiveMap().asScala.toMap).asJava),
schema(),
containerStateHandle)
Expand All @@ -93,12 +93,12 @@ private class CosmosChangeFeedTable(val transforms: Array[Transform],

val defaultSchema: StructType = changeFeedConfig.changeFeedMode match {
case ChangeFeedModes.incremental =>
CosmosChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled
ChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled
case ChangeFeedModes.fullFidelity =>
CosmosChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled
}

CosmosTableSchemaInferer.inferSchema(client, userConfig, defaultSchema)
CosmosTableSchemaInferrer.inferSchema(client, userConfig, defaultSchema)
}

// This can be used only when databaseName and ContainerName are specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class CosmosCatalog

private var catalogName: String = _
private var client: CosmosAsyncClient = _

var tableOptions: Map[String, String] = _
private var tableOptions: Map[String, String] = _

/**
* Called to initialize configuration.
Expand Down Expand Up @@ -180,8 +179,6 @@ class CosmosCatalog
}
}

class CosmosCatalogException(msg: String) extends RuntimeException(msg)

@throws(classOf[UnsupportedOperationException])
override def alterNamespace(namespace: Array[String],
changes: NamespaceChange*): Unit = {
Expand All @@ -192,7 +189,7 @@ class CosmosCatalog

/**
* Drop a namespace from the catalog, recursively dropping all objects within the namespace.
* @param namespace - a multi-part namesp
* @param namespace - a multi-part namespace
* @return true if the namespace was dropped
*/
@throws(classOf[NoSuchNamespaceException])
Expand Down Expand Up @@ -238,9 +235,12 @@ class CosmosCatalog
val databaseName = toCosmosDatabaseName(ident.namespace().head)
val containerName = toCosmosContainerName(ident.name())
getContainerMetadata(ident, databaseName, containerName) // validates that table exists
// scalastyle:off null
new CosmosTable(Array[Transform](), Some(databaseName), Some(containerName), tableOptions.asJava, Option.empty)
// scalastyle:off on
new ItemsTable(
Array[Transform](),
Some(databaseName),
Some(containerName),
tableOptions.asJava,
Option.empty)
}

private def getContainerMetadata(ident: Identifier,
Expand Down Expand Up @@ -289,8 +289,12 @@ class CosmosCatalog
.block()
}

// TODO: moderakh this needs to be wired up against CosmosTable
new CosmosTable(partitions, Some(databaseName), Some(containerName), tableOptions.asJava, Option.apply(schema))
new ItemsTable(
partitions,
Some(databaseName),
Some(containerName),
tableOptions.asJava,
Option.apply(schema))
}

@throws(classOf[UnsupportedOperationException])
Expand Down Expand Up @@ -349,11 +353,11 @@ class CosmosCatalog
tableIdent
}

def toTableConfig(options: CaseInsensitiveStringMap): Map[String, String] = {
private def toTableConfig(options: CaseInsensitiveStringMap): Map[String, String] = {
options.asCaseSensitiveMap().asScala.toMap
}

object CosmosContainerProperties {
private object CosmosContainerProperties {
private val partitionKeyPath = "partitionKeyPath"
private val defaultPartitionKeyPath = "/id"
def getPartitionKeyPath(properties: Map[String, String]): String = {
Expand All @@ -363,9 +367,9 @@ class CosmosCatalog
// TODO: add support for other container properties, indexing policy?
}

object CosmosThroughputProperties {
private object CosmosThroughputProperties {
private val manualThroughputFieldName = "manualThroughput"
private val autoscaleMaxThroughputName = "autoscaleMaxThroughput"
private val autoScaleMaxThroughputName = "autoScaleMaxThroughput"

def tryGetThroughputProperties(
properties: Map[String, String]): Option[ThroughputProperties] = {
Expand All @@ -377,11 +381,11 @@ class CosmosCatalog
)
.orElse(
properties
.get(autoscaleMaxThroughputName)
.get(autoScaleMaxThroughputName)
.map(
autoscaleMaxThroughput =>
autoScaleMaxThroughput =>
ThroughputProperties.createAutoscaledThroughput(
autoscaleMaxThroughput.toInt)
autoScaleMaxThroughput.toInt)
)
)
}
Expand All @@ -393,9 +397,9 @@ class CosmosCatalog
if (manualThroughput != null) {
props.put(manualThroughputFieldName, manualThroughput.toString)
} else {
val autoscaleMaxThroughput =
val autoScaleMaxThroughput =
throughputProperties.getAutoscaleMaxThroughput
props.put(autoscaleMaxThroughputName, autoscaleMaxThroughput.toString)
props.put(autoScaleMaxThroughputName, autoScaleMaxThroughput.toString)
}
props.asScala.toMap
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.spark

class CosmosCatalogException(msg: String) extends RuntimeException(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import java.util
import scala.collection.JavaConverters._
// scalastyle:on underscore.import

private class CosmosChangeFeedItemsDataSource
class CosmosChangeFeedDataSource
extends DataSourceRegister
with TableProvider
with CosmosLoggingTrait {
Expand All @@ -28,13 +28,13 @@ private class CosmosChangeFeedItemsDataSource
* @return StructType inferred schema
*/
override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
new CosmosChangeFeedTable(Array.empty, options).schema()
new ChangeFeedTable(Array.empty, options).schema()
}

/**
* Represents the format that this data source provider uses.
*/
override def shortName(): String = "cosmos.changeFeed.items"
override def shortName(): String = CosmosConstants.Names.ChangeFeedDataSourceShortName

/**
* Return a `Table` instance with the specified table schema, partitioning and properties
Expand All @@ -53,7 +53,7 @@ private class CosmosChangeFeedItemsDataSource
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
// getTable - This is used for loading table with user specified schema and other transformations.
new CosmosChangeFeedTable(
new ChangeFeedTable(
partitioning,
CosmosConfig.getEffectiveConfig(properties.asScala.toMap).asJava,
Option.apply(schema))
Expand Down
Loading