Skip to content

Commit

Permalink
Add conf for specifying flint checkpoint location (#577)
Browse files Browse the repository at this point in the history
* Add conf for specifying flint checkpoint location

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Add IT and public doc

Signed-off-by: Louis Chu <clingzhi@amazon.com>

---------

Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Aug 21, 2024
1 parent f283df8 commit 5c190b3
Show file tree
Hide file tree
Showing 15 changed files with 272 additions and 44 deletions.
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version.
- `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance.
- `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '<rootDir>/<indexName>/<UUID>' to isolate checkpoint data.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ object FlintSparkConf {
.doc("Enable hybrid scan to include latest source data not refreshed to index yet")
.createWithDefault("false")

val CHECKPOINT_LOCATION_ROOT_DIR = FlintConfig("spark.flint.index.checkpointLocation.rootDir")
.doc("Root directory of a user specified checkpoint location for index refresh")
.createOptional()

val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory")
.doc("Checkpoint location for incremental refresh index will be mandatory if enabled")
.createWithDefault("true")
Expand Down Expand Up @@ -261,6 +265,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean

def checkpointLocationRootDir: Option[String] = CHECKPOINT_LOCATION_ROOT_DIR.readFrom(reader)

def isCheckpointMandatory: Boolean = CHECKPOINT_MANDATORY.readFrom(reader).toBoolean

def monitorInitialDelaySeconds(): Int = MONITOR_INITIAL_DELAY_SECONDS.readFrom(reader).toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

package org.opensearch.flint.spark

import java.util.{Collections, UUID}

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh

import org.apache.spark.sql.catalog.Column
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.flint.{findField, loadTable, parseTableName, qualifyTableName}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types.{StructField, StructType}

/**
Expand Down Expand Up @@ -48,8 +52,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
* @return
* builder
*/
def options(options: FlintSparkIndexOptions): this.type = {
this.indexOptions = options
def options(options: FlintSparkIndexOptions, indexName: String): this.type = {
val updatedOptions = updateOptionWithDefaultCheckpointLocation(indexName, options)
this.indexOptions = updatedOptions
this
}

Expand Down Expand Up @@ -139,4 +144,37 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
isPartition = false, // useless for now so just set to false
isBucket = false)
}

/**
* Updates the options with a default checkpoint location if not already set.
*
* @param indexName
* The index name string
* @param options
* The original FlintSparkIndexOptions
* @return
* Updated FlintSparkIndexOptions
*/
private def updateOptionWithDefaultCheckpointLocation(
indexName: String,
options: FlintSparkIndexOptions): FlintSparkIndexOptions = {

val checkpointLocationRootDirOption = new FlintSparkConf(
Collections.emptyMap[String, String]).checkpointLocationRootDir

if (options.checkpointLocation().isEmpty) {
checkpointLocationRootDirOption match {
case Some(checkpointLocationRootDir) =>
// Currently, deleting and recreating the flint index will enter same checkpoint dir.
// Use a UUID to isolate checkpoint data.
val checkpointLocation =
s"${checkpointLocationRootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}"
FlintSparkIndexOptions(
options.options + (CHECKPOINT_LOCATION.toString -> checkpointLocation))
case None => options
}
} else {
options
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
indexBuilder
.options(indexOptions)
.options(indexOptions, indexName)
.create(ignoreIfExists)

// Trigger auto refresh if enabled and not using external scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
val flintIndexName = getFlintIndexName(flint, ctx.mvName)

mvBuilder
.options(indexOptions)
.options(indexOptions, flintIndexName)
.create(ignoreIfExists)

// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) {
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName)
}
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
val indexName = getSkippingIndexName(flint, ctx.tableName)

indexBuilder
.options(indexOptions)
.options(indexOptions, indexName)
.create(ignoreIfExists)

// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) {
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName)
}
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkIndexBuilderSuite extends FlintSuite {

val indexName: String = "test_index"
val testCheckpointLocation = "/test/checkpoints/"

override def beforeAll(): Unit = {
super.beforeAll()

Expand All @@ -31,6 +36,56 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
super.afterAll()
}

test("indexOptions should not have checkpoint location when no conf") {
assert(!conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))

val options = FlintSparkIndexOptions(Map.empty)
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
updatedOptions.checkpointLocation() shouldBe None
}

test("indexOptions should not override existing checkpoint location when no conf") {
assert(!conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))

val options =
FlintSparkIndexOptions(Map(CHECKPOINT_LOCATION.toString -> testCheckpointLocation))
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
updatedOptions.checkpointLocation() shouldBe Some(testCheckpointLocation)
}

test("indexOptions should not override existing checkpoint location with conf") {
conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation)
assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))

val options =
FlintSparkIndexOptions(Map(CHECKPOINT_LOCATION.toString -> testCheckpointLocation))
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
updatedOptions.checkpointLocation() shouldBe Some(testCheckpointLocation)
}

test("indexOptions should have default checkpoint location with conf") {
conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation)
assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))

val options = FlintSparkIndexOptions(Map.empty)
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
assert(updatedOptions.checkpointLocation().isDefined, "Checkpoint location should be defined")
assert(
updatedOptions
.checkpointLocation()
.get
.startsWith(s"${testCheckpointLocation}${indexName}"),
s"Checkpoint location should start with ${testCheckpointLocation}${indexName}")
}

test("find column type") {
builder()
.onTable("test")
Expand Down Expand Up @@ -94,6 +149,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
* Fake builder that have access to internal method for assertion
*/
class FakeFlintSparkIndexBuilder extends FlintSparkIndexBuilder(new FlintSpark(spark)) {
def testOptions: FlintSparkIndexOptions = this.indexOptions

def onTable(tableName: String): FakeFlintSparkIndexBuilder = {
this.tableName = tableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -104,7 +105,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
.create()

val jobId = flint.refreshIndex(testFlintIndex)
Expand All @@ -117,6 +118,47 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {

val indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))

val indexOptions = flint.describeIndex(testFlintIndex)
indexOptions shouldBe defined
indexOptions.get.options.checkpointLocation() shouldBe None
}

test("create covering index with default checkpoint location successfully") {
withTempDir { checkpointDir =>
conf.setConfString(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
checkpointDir.getAbsolutePath)
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex)
.create()

val jobId = flint.refreshIndex(testFlintIndex)
jobId shouldBe defined

val job = spark.streams.get(jobId.get)
failAfter(streamingTimeout) {
job.processAllAvailable()
}

val indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined

val checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
assert(
checkpointLocation.get.contains(testFlintIndex),
s"Checkpoint location dir should contain ${testFlintIndex}")

conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
}
}

test("auto refresh covering index successfully with external scheduler") {
Expand All @@ -131,7 +173,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "external",
"checkpoint_location" -> checkpointDir.getAbsolutePath)))
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()

val jobId = flint.refreshIndex(testFlintIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
.create()

flint.recoverIndex(testIndex) shouldBe true
Expand All @@ -65,7 +65,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
.create()

updateLatestLogEntry(
Expand All @@ -88,7 +88,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
.create()

updateLatestLogEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
.skippingIndex()
.onTable(testTable)
.addValueSet("name")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex)
.create()
flint.refreshIndex(testFlintIndex)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
.skippingIndex()
.onTable(testTableQualifiedName)
.addValueSet("name")
.options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
.options(
FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")),
testSkippingFlintIndex)
.create()
flint.refreshIndex(testSkippingFlintIndex)
val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex)
Expand Down Expand Up @@ -153,7 +155,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
.name(testCoveringIndex)
.onTable(testTableQualifiedName)
.addIndexColumns("name", "age")
.options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
.options(
FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")),
testCoveringFlintIndex)
.create()
flint.refreshIndex(testCoveringFlintIndex)

Expand Down Expand Up @@ -228,7 +232,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
.name(testCoveringIndexSpecial)
.onTable(testTableQualifiedName)
.addIndexColumns("name", "age")
.options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
.options(
FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")),
testCoveringFlintIndexSpecial)
.create()
flint.refreshIndex(testCoveringFlintIndexSpecial)

Expand Down
Loading

0 comments on commit 5c190b3

Please sign in to comment.