From c16f9ec0b427a73f849eb33f59d506c57610d721 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 20 Sep 2024 14:24:52 +0530 Subject: [PATCH] [HUDI-6891] Fix RO queries with RLI and record key predicate --- .../apache/hudi/BaseHoodieTableFileIndex.java | 4 ++ .../hudi/BloomFiltersIndexSupport.scala | 9 ++- .../apache/hudi/ColumnStatsIndexSupport.scala | 8 +-- .../org/apache/hudi/HoodieFileIndex.scala | 2 +- .../apache/hudi/RecordLevelIndexSupport.scala | 9 ++- .../apache/hudi/SecondaryIndexSupport.scala | 9 ++- .../apache/hudi/SparkBaseIndexSupport.scala | 5 +- .../hudi/functional/TestMORDataSource.scala | 69 ++++++++++++++++++- 8 files changed, 102 insertions(+), 13 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 5a0fd79fcc4a..4688c160d69e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -195,6 +195,10 @@ public void close() throws Exception { resetTableMetadata(null); } + public HoodieTableQueryType getQueryType() { + return queryType; + } + protected String[] getPartitionColumns() { return partitionColumns; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala index 4684a58966a9..e13869895ea5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala @@ -21,7 +21,7 @@ package org.apache.hudi import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.metadata.HoodieTableMetadataUtil import org.apache.spark.sql.SparkSession @@ -78,6 +78,13 @@ class BloomFiltersIndexSupport(spark: SparkSession, metadataConfig.isEnabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS) } + /** + * Returns true if the query type is supported by the index. + */ + override def supportsQueryType(queryType: HoodieTableQueryType): Boolean = { + queryType == HoodieTableQueryType.SNAPSHOT + } + override def invalidateCaches(): Unit = { // no caches for this index type, do nothing } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index c3b5228d195d..c471649d9a6c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -348,10 +348,10 @@ class ColumnStatsIndexSupport(spark: SparkSession, val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] = // NOTE: Explicit conversion is required for Scala 2.11 metadataRecords.map(JFunction.toJavaSerializableFunction(record => { - toScalaOption(record.getData.getInsertValue(null, null)) - .map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata) - .orNull - })) + toScalaOption(record.getData.getInsertValue(null, null)) + .map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata) + .orNull + })) .filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null)) columnStatsRecords diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index ccca13256f9b..df54c4653555 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -400,7 +400,7 @@ case class HoodieFileIndex(spark: SparkSession, lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) if (isDataSkippingEnabled) { for(indexSupport: SparkBaseIndexSupport <- indicesSupport) { - if (indexSupport.isIndexAvailable) { + if (indexSupport.supportsQueryType(getQueryType) && indexSupport.isIndexAvailable) { val prunedFileNames = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns, prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) if (prunedFileNames.nonEmpty) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala index f37beef07270..0202b35aa610 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala @@ -20,7 +20,7 @@ package org.apache.hudi import org.apache.hudi.RecordLevelIndexSupport.getPrunedStoragePaths import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.metadata.HoodieTableMetadataUtil @@ -91,6 +91,13 @@ class RecordLevelIndexSupport(spark: SparkSession, def isIndexAvailable: Boolean = { metadataConfig.isEnabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) } + + /** + * Returns true if the query type is supported by the index. + */ + override def supportsQueryType(queryType: HoodieTableQueryType): Boolean = { + queryType == HoodieTableQueryType.SNAPSHOT + } } object RecordLevelIndexSupport { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala index 39ea4b28c8c0..8b3403cba2e5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala @@ -23,7 +23,7 @@ import org.apache.hudi.RecordLevelIndexSupport.{filterQueryWithRecordKey, getPru import org.apache.hudi.SecondaryIndexSupport.filterQueriesWithSecondaryKey import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX import org.apache.hudi.storage.StoragePath @@ -69,6 +69,13 @@ class SecondaryIndexSupport(spark: SparkSession, metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent && !metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty } + /** + * Returns true if the query type is supported by the index. + */ + override def supportsQueryType(queryType: HoodieTableQueryType): Boolean = { + queryType == HoodieTableQueryType.SNAPSHOT + } + /** * Returns the list of candidate files which store the provided record keys based on Metadata Table Secondary Index * and Metadata Table Record Index. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala index 2371e4b066e4..d3157c398291 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala @@ -21,11 +21,10 @@ package org.apache.hudi import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata} import org.apache.hudi.util.JFunction - import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.expressions.{And, Expression} import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr @@ -45,6 +44,8 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, def isIndexAvailable: Boolean + def supportsQueryType(queryType: HoodieTableQueryType): Boolean = true + def computeCandidateIsStrict(spark: SparkSession, fileIndex: HoodieFileIndex, queryFilters: Seq[Expression], diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 48fecc6cd951..bd504d7ccb47 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -35,9 +35,9 @@ import org.apache.hudi.storage.StoragePath import org.apache.hudi.table.action.compact.CompactionTriggerStrategy import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieSparkClientTestBase} import org.apache.hudi.util.JFunction -import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, DefaultSparkRecordMerger, SparkDatasetMixin} - +import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieDataSourceHelpers, SparkDatasetMixin} import org.apache.hadoop.fs.Path +import org.apache.hudi.QuickstartUtils.convertToStringList import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension @@ -49,7 +49,6 @@ import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} import org.slf4j.LoggerFactory import java.util.function.Consumer - import scala.collection.JavaConverters._ /** @@ -1418,4 +1417,68 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin metaClient = createMetaClient(spark, basePath) assertEquals(metaClient.getTableConfig.getRecordMergerStrategy, mergerStrategyName) } + + /** + * Test Read-Optimized query on MOR table with RECORD_INDEX enabled. + */ + @Test + def testReadOptimizedQueryWithRecordIndex(): Unit = { + var (writeOpts, readOpts) = getWriterReaderOpts() + writeOpts = writeOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT.key -> "false", + HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0", + HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true", + HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name() + ) + readOpts = readOpts ++ Map( + HoodieMetadataConfig.ENABLE.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true" + ) + // Create a MOR table and add three records to the table. + val records = recordsToStrings(dataGen.generateInserts("000", 3)).asScala.toSeq + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + inputDF.write.format("org.apache.hudi") + .options(writeOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + + var roDf = spark.read.format("hudi") + .options(readOpts) + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath) + // assert count + assertEquals(3, roDf.count()) + + // choose a record to delete + val deleteRecord = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 1)).asScala.toSeq + // get the record key from the deleted record records2 + val recordKey = deleteRecord.head.split(",")(1).split(":")(1).trim.replace("\"", "") + // delete the record + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(deleteRecord, 1)) + inputDF2.write.format("org.apache.hudi") + .options(writeOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + // load RO view again with data skipping enabled + roDf = spark.read.format("hudi") + .options(readOpts) + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath) + + // There should still be 3 records in RO view + assertEquals(3, roDf.count()) + // deleted record should still show in RO view + assertEquals(1, roDf.where(s"_row_key = '$recordKey'").count()) + + // load snapshot view + val snapshotDF = spark.read.format("hudi") + .options(readOpts) + .load(basePath) + // There should be only 2 records in snapshot view + assertEquals(2, snapshotDF.count()) + // deleted record should NOT show in snapshot view + assertEquals(0, snapshotDF.where(s"_row_key = '$recordKey'").count()) + } }