Skip to content

Commit

Permalink
[HUDI-7807] Fixing spark-sql for pk less tables (#11354)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored and yihua committed May 29, 2024
1 parent 88d057f commit fe08b6f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ public static List<String> getRecordKeyFields(TypedProperties props) {
* @return true if record keys need to be auto generated. false otherwise.
*/
public static boolean isAutoGeneratedRecordKeysEnabled(TypedProperties props) {
return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
|| props.getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).equals(StringUtils.EMPTY_STRING);
// spark-sql sets record key config to empty string for update, and couple of other statements.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public static KeyGenerator createKeyGenerator(String keyGeneratorClass, TypedPro
//Need to prevent overwriting the keygen for spark sql merge into because we need to extract
//the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed.
&& !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
if (autoRecordKeyGen) {
props.remove(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
}
KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
if (autoRecordKeyGen) {
return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator) keyGenerator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ class HoodieSparkSqlWriterInternal {
originKeyGeneratorClassName, paramsWithoutDefaults)

// Validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite);
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

asyncCompactionTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
asyncClusteringTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,35 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
test("Test Delete Table Without Primary Key") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Seq (true, false).foreach { isPartitioned =>
val tableName = generateTableName
val partitionedClause = if (isPartitioned) {
"PARTITIONED BY (name)"
} else {
""
}
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
| ts long,
| name string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| preCombineField = 'ts'
| )
| $partitionedClause
""".stripMargin)

// test with optimized sql writes enabled.
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")

// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 1, 10, 1000, 'a1'")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)
Expand All @@ -112,7 +119,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
Seq(0)
)

spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000")
spark.sql(s"insert into $tableName select 2, 10, 1000, 'a2'")
spark.sql(s"delete from $tableName where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(2, "a2", 10.0, 1000)
Expand All @@ -124,6 +131,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
)
}
}
}
}

test("Test Delete Table On Non-PK Condition") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,54 +77,61 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
test("Test Update Table Without Primary Key") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| preCombineField = 'ts'
| )
""".stripMargin)

// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)
Seq(true, false).foreach { isPartitioned =>
val tableName = generateTableName
val partitionedClause = if (isPartitioned) {
"PARTITIONED BY (name)"
} else {
""
}
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| price double,
| ts long,
| name string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| preCombineField = 'ts'
| )
| $partitionedClause
""".stripMargin)

// test with optimized sql writes enabled.
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
// insert data to table
spark.sql(s"insert into $tableName select 1,10, 1000, 'a1'")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

// update data
spark.sql(s"update $tableName set price = 20 where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 20.0, 1000)
)
// test with optimized sql writes enabled.
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")

// update data
spark.sql(s"update $tableName set price = price * 2 where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 40.0, 1000)
)
// update data
spark.sql(s"update $tableName set price = 20 where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 20.0, 1000)
)

// verify default compaction w/ MOR
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
spark.sql(s"update $tableName set price = price * 2 where id = 1")
spark.sql(s"update $tableName set price = price * 2 where id = 1")
// update data
spark.sql(s"update $tableName set price = price * 2 where id = 1")
// verify compaction is complete
val metaClient = createMetaClient(spark, tmp.getCanonicalPath + "/" + tableName)
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction, "commit")
}
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 40.0, 1000)
)

// verify default compaction w/ MOR
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
spark.sql(s"update $tableName set price = price * 2 where id = 1")
spark.sql(s"update $tableName set price = price * 2 where id = 1")
spark.sql(s"update $tableName set price = price * 2 where id = 1")
// verify compaction is complete
val metaClient = createMetaClient(spark, tmp.getCanonicalPath + "/" + tableName)
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction, "commit")
}
}
}
})
}
Expand Down

0 comments on commit fe08b6f

Please sign in to comment.