Skip to content

Commit

Permalink
Add SQL IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Jan 5, 2024
1 parent 5104bb5 commit 17b3803
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ indexColTypeList

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
(WITH LEFT_PAREN propertyValues RIGHT_PAREN)?
(LEFT_PAREN propertyValues RIGHT_PAREN)?
;

propertyValues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
Expand Down Expand Up @@ -49,7 +50,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
skipType match {
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET =>
indexBuilder.addValueSet(colName, (Seq("limit") zip paramValues).toMap)
indexBuilder.addValueSet(colName, (Seq(VALUE_SET_MAX_SIZE_KEY) zip paramValues).toMap)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.{defined, have}
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
Expand All @@ -30,7 +30,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
override def beforeAll(): Unit = {
super.beforeAll()

createPartitionedTable(testTable)
createPartitionedMultiRowTable(testTable)
}

protected override def afterEach(): Unit = {
Expand All @@ -52,16 +52,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}
awaitStreamingComplete(job.get.id.toString)

val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex)
flint.describeIndex(testIndex) shouldBe defined
indexData.count() shouldBe 2
}

test("create skipping index with max size value set") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (
| address VALUE_SET(2)
| )
| WITH (auto_refresh = true)
| """.stripMargin)

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)

checkAnswer(
flint.queryIndex(testIndex).select("address"),
Seq(
Row("""["Seattle","Portland"]"""),
Row(null) // Value set exceeded limit size is expected to be null
))
}

test("create skipping index with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
Expand Down

0 comments on commit 17b3803

Please sign in to comment.