Skip to content

Commit

Permalink
Update checkpoint location on alter path (#616)
Browse files Browse the repository at this point in the history
* Update checkpoint location on alter path

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Resolve comments

Signed-off-by: Louis Chu <clingzhi@amazon.com>

---------

Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Sep 9, 2024
1 parent 88ad15f commit 9ac50a7
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import java.util.{Collections, UUID}
import java.util.Collections

import scala.collection.JavaConverters.mapAsJavaMapConverter

Expand Down Expand Up @@ -81,8 +81,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
index: FlintSparkIndex,
updateOptions: FlintSparkIndexOptions): FlintSparkIndex = {
val originalOptions = index.options
val updatedOptions =
originalOptions.copy(options = originalOptions.options ++ updateOptions.options)
val updatedOptions = updateOptionWithDefaultCheckpointLocation(
index.name(),
originalOptions.copy(options = originalOptions.options ++ updateOptions.options))
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
Expand Down Expand Up @@ -159,22 +160,13 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
indexName: String,
options: FlintSparkIndexOptions): FlintSparkIndexOptions = {

val checkpointLocationRootDirOption = new FlintSparkConf(
Collections.emptyMap[String, String]).checkpointLocationRootDir

if (options.checkpointLocation().isEmpty) {
checkpointLocationRootDirOption match {
case Some(checkpointLocationRootDir) =>
// Currently, deleting and recreating the flint index will enter same checkpoint dir.
// Use a UUID to isolate checkpoint data.
val checkpointLocation =
s"${checkpointLocationRootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}"
FlintSparkIndexOptions(
options.options + (CHECKPOINT_LOCATION.toString -> checkpointLocation))
case None => options
}
} else {
options
val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String])
val checkpointLocation = options.checkpointLocation(indexName, flintSparkConf)

checkpointLocation match {
case Some(location) =>
FlintSparkIndexOptions(options.options + (CHECKPOINT_LOCATION.toString -> location))
case None => options
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

package org.opensearch.flint.spark

import java.util.UUID

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode

import org.apache.spark.sql.flint.config.FlintSparkConf

/**
* Flint Spark index configurable options.
*
Expand Down Expand Up @@ -165,6 +169,18 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
case None => // no action needed if modeStr is empty
}
}

def checkpointLocation(indexName: String, flintSparkConf: FlintSparkConf): Option[String] = {
options.get(CHECKPOINT_LOCATION.toString) match {
case Some(location) => Some(location)
case None =>
// Currently, deleting and recreating the flint index will enter same checkpoint dir.
// Use a UUID to isolate checkpoint data.
flintSparkConf.checkpointLocationRootDir.map { rootDir =>
s"${rootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}"
}
}
}
}

object FlintSparkIndexOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
}

test("indexOptions should not override existing checkpoint location with conf") {
conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation)
setFlintSparkConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, testCheckpointLocation)
assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))

val options =
Expand All @@ -70,7 +70,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
}

test("indexOptions should have default checkpoint location with conf") {
conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation)
setFlintSparkConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, testCheckpointLocation)
assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))

val options = FlintSparkIndexOptions(Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {

test("create covering index with default checkpoint location successfully") {
withTempDir { checkpointDir =>
conf.setConfString(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)
flint
.coveringIndex()
Expand Down Expand Up @@ -213,6 +213,59 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))
}

test("update covering index successfully with custom checkpoint location") {
withTempDir { checkpointDir =>
// 1. Create an full refresh CV
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(FlintSparkIndexOptions.empty, testFlintIndex)
.create()
var indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq())

var index = flint.describeIndex(testFlintIndex)
var checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 2. Update the spark conf with a custom checkpoint location
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)

index = flint.describeIndex(testFlintIndex)
checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 3. Update index to auto refresh
val updatedIndex = flint
.coveringIndex()
.copyWithUpdate(index.get, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe defined

val job = spark.streams.get(jobId.get)
failAfter(streamingTimeout) {
job.processAllAvailable()
}

indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))

index = flint.describeIndex(testFlintIndex)

checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
assert(
checkpointLocation.get.contains(testFlintIndex),
s"Checkpoint location dir should contain ${testFlintIndex}")

conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
}
}

test("can have multiple covering indexes on a table") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

test("create materialized view with default checkpoint location successfully") {
withTempDir { checkpointDir =>
conf.setConfString(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)

val indexOptions =
Expand Down Expand Up @@ -268,6 +268,70 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
}
}

test("update materialized view successfully with custom checkpoint location") {
withTempDir { checkpointDir =>
// 1. Create full refresh MV
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.options(FlintSparkIndexOptions.empty, testFlintIndex)
.create()
var indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq())

var index = flint.describeIndex(testFlintIndex)
var checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 2. Update the spark conf with a custom checkpoint location
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)

index = flint.describeIndex(testFlintIndex)
checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 3. Update Flint index to auto refresh and wait for complete
val updatedIndex = flint
.materializedView()
.copyWithUpdate(
index.get,
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "watermark_delay" -> "1 Minute")))
val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe defined

val job = spark.streams.get(jobId.get)
failAfter(streamingTimeout) {
job.processAllAvailable()
}

indexData = flint.queryIndex(testFlintIndex)
checkAnswer(
indexData.select("startTime", "count"),
Seq(
Row(timestamp("2023-10-01 00:00:00"), 1),
Row(timestamp("2023-10-01 00:10:00"), 2),
Row(timestamp("2023-10-01 01:00:00"), 1)
/*
* The last row is pending to fire upon watermark
* Row(timestamp("2023-10-01 02:00:00"), 1)
*/
))

index = flint.describeIndex(testFlintIndex)

checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
assert(
checkpointLocation.get.contains(testFlintIndex),
s"Checkpoint location dir should contain ${testFlintIndex}")

conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)

private def withIncrementalMaterializedView(query: String)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

test("create skipping index with default checkpoint location successfully") {
withTempDir { checkpointDir =>
conf.setConfString(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)
flint
.skippingIndex()
Expand Down Expand Up @@ -369,6 +369,58 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
flint.queryIndex(testIndex).collect().toSet should have size 2
}

test("update skipping index successfully with custom checkpoint location") {
withTempDir { checkpointDir =>
// 1. Create full refresh SI
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions.empty, testIndex)
.create()

flint.queryIndex(testIndex).collect().toSet should have size 0

var index = flint.describeIndex(testIndex)
var checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 2. Update the spark conf with a custom checkpoint location
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)

index = flint.describeIndex(testIndex)
checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 3. Update Flint index to auto refresh and wait for complete
val updatedIndex =
flint
.skippingIndex()
.copyWithUpdate(index.get, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe defined

val job = spark.streams.get(jobId.get)
failAfter(streamingTimeout) {
job.processAllAvailable()
}

flint.queryIndex(testIndex).collect().toSet should have size 2

index = flint.describeIndex(testIndex)

checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
assert(
checkpointLocation.get.contains(testIndex),
s"Checkpoint location dir should contain ${testIndex}")

conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
}
}

test("can have only 1 skipping index on a table") {
flint
.skippingIndex()
Expand Down

0 comments on commit 9ac50a7

Please sign in to comment.