Skip to content

Commit

Permalink
[HUDI-6891] Fix RO queries with RLI and record key predicate
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Sep 21, 2024
1 parent ee7d3a7 commit c16f9ec
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ public void close() throws Exception {
resetTableMetadata(null);
}

public HoodieTableQueryType getQueryType() {
return queryType;
}

protected String[] getPartitionColumns() {
return partitionColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi

import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -78,6 +78,13 @@ class BloomFiltersIndexSupport(spark: SparkSession,
metadataConfig.isEnabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)
}

/**
* Returns true if the query type is supported by the index.
*/
override def supportsQueryType(queryType: HoodieTableQueryType): Boolean = {
queryType == HoodieTableQueryType.SNAPSHOT
}

override def invalidateCaches(): Unit = {
// no caches for this index type, do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,10 @@ class ColumnStatsIndexSupport(spark: SparkSession,
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
// NOTE: Explicit conversion is required for Scala 2.11
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
toScalaOption(record.getData.getInsertValue(null, null))
.map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
.orNull
}))
toScalaOption(record.getData.getInsertValue(null, null))
.map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
.orNull
}))
.filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null))

columnStatsRecords
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ case class HoodieFileIndex(spark: SparkSession,
lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
if (isDataSkippingEnabled) {
for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
if (indexSupport.isIndexAvailable) {
if (indexSupport.supportsQueryType(getQueryType) && indexSupport.isIndexAvailable) {
val prunedFileNames = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns,
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
if (prunedFileNames.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.hudi
import org.apache.hudi.RecordLevelIndexSupport.getPrunedStoragePaths
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.HoodieTableMetadataUtil
Expand Down Expand Up @@ -91,6 +91,13 @@ class RecordLevelIndexSupport(spark: SparkSession,
def isIndexAvailable: Boolean = {
metadataConfig.isEnabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
}

/**
* Returns true if the query type is supported by the index.
*/
override def supportsQueryType(queryType: HoodieTableQueryType): Boolean = {
queryType == HoodieTableQueryType.SNAPSHOT
}
}

object RecordLevelIndexSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hudi.RecordLevelIndexSupport.{filterQueryWithRecordKey, getPru
import org.apache.hudi.SecondaryIndexSupport.filterQueriesWithSecondaryKey
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX
import org.apache.hudi.storage.StoragePath
Expand Down Expand Up @@ -69,6 +69,13 @@ class SecondaryIndexSupport(spark: SparkSession,
metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent && !metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty
}

/**
* Returns true if the query type is supported by the index.
*/
override def supportsQueryType(queryType: HoodieTableQueryType): Boolean = {
queryType == HoodieTableQueryType.SNAPSHOT
}

/**
* Returns the list of candidate files which store the provided record keys based on Metadata Table Secondary Index
* and Metadata Table Record Index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ package org.apache.hudi
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.hudi.util.JFunction

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
Expand All @@ -45,6 +44,8 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,

def isIndexAvailable: Boolean

def supportsQueryType(queryType: HoodieTableQueryType): Boolean = true

def computeCandidateIsStrict(spark: SparkSession,
fileIndex: HoodieFileIndex,
queryFilters: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import org.apache.hudi.storage.StoragePath
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieSparkClientTestBase}
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, DefaultSparkRecordMerger, SparkDatasetMixin}

import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieDataSourceHelpers, SparkDatasetMixin}
import org.apache.hadoop.fs.Path
import org.apache.hudi.QuickstartUtils.convertToStringList
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
Expand All @@ -49,7 +49,6 @@ import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.slf4j.LoggerFactory

import java.util.function.Consumer

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -1418,4 +1417,68 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
metaClient = createMetaClient(spark, basePath)
assertEquals(metaClient.getTableConfig.getRecordMergerStrategy, mergerStrategyName)
}

/**
* Test Read-Optimized query on MOR table with RECORD_INDEX enabled.
*/
@Test
def testReadOptimizedQueryWithRecordIndex(): Unit = {
var (writeOpts, readOpts) = getWriterReaderOpts()
writeOpts = writeOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
)
readOpts = readOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> "true",
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
)
// Create a MOR table and add three records to the table.
val records = recordsToStrings(dataGen.generateInserts("000", 3)).asScala.toSeq
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("org.apache.hudi")
.options(writeOpts)
.mode(SaveMode.Overwrite)
.save(basePath)

var roDf = spark.read.format("hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath)
// assert count
assertEquals(3, roDf.count())

// choose a record to delete
val deleteRecord = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 1)).asScala.toSeq
// get the record key from the deleted record records2
val recordKey = deleteRecord.head.split(",")(1).split(":")(1).trim.replace("\"", "")
// delete the record
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(deleteRecord, 1))
inputDF2.write.format("org.apache.hudi")
.options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
// load RO view again with data skipping enabled
roDf = spark.read.format("hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath)

// There should still be 3 records in RO view
assertEquals(3, roDf.count())
// deleted record should still show in RO view
assertEquals(1, roDf.where(s"_row_key = '$recordKey'").count())

// load snapshot view
val snapshotDF = spark.read.format("hudi")
.options(readOpts)
.load(basePath)
// There should be only 2 records in snapshot view
assertEquals(2, snapshotDF.count())
// deleted record should NOT show in snapshot view
assertEquals(0, snapshotDF.where(s"_row_key = '$recordKey'").count())
}
}

0 comments on commit c16f9ec

Please sign in to comment.