Skip to content

Commit

Permalink
Configure value set max size in SQL statement (#210)
Browse files Browse the repository at this point in the history
* Configure value set max size in SQL DDL

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix broken UT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add SQL IT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Refactor javadoc and UT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update user manual

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Jan 19, 2024
1 parent 6e163e7 commit dae36ec
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 54 deletions.
13 changes: 9 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ The default maximum size for the value set is 100. In cases where a file contain
```sql
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
( column <index_type> [, ...] )
( column <skip_type> <skip_params> [, ...] )
WHERE <filter_predicate>
WITH ( options )

Expand All @@ -142,18 +142,23 @@ DROP SKIPPING INDEX ON <object>
<object> ::= [db_name].[schema_name].table_name
```

Skipping index type:
Skipping index type consists of skip type name and optional parameters

```sql
<index_type> ::= { PARTITION, VALUE_SET, MIN_MAX }
<skip_type> ::= { PARTITION, VALUE_SET, MIN_MAX }

<skip_params> := ( param1, param2, ... )
```

Example:

```sql
CREATE SKIPPING INDEX ON alb_logs
(
elb_status_code VALUE_SET
time PARTITION,
elb_status_code VALUE_SET,
client_ip VALUE_SET(20),
request_processing_time MIN_MAX
)
WHERE time > '2023-04-01 00:00:00'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ indexColTypeList

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
(LEFT_PAREN skipParams RIGHT_PAREN)?
;

skipParams
: propertyValue (COMMA propertyValue)*
;

indexName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import java.util.Collections

import scala.collection.JavaConverters.mapAsScalaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
Expand Down Expand Up @@ -45,12 +47,16 @@ object FlintSparkIndexFactory {
val skippingKind = SkippingKind.withName(getString(colInfo, "kind"))
val columnName = getString(colInfo, "columnName")
val columnType = getString(colInfo, "columnType")
val parameters = getSkipParams(colInfo)

skippingKind match {
case PARTITION =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case VALUE_SET =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
ValueSetSkippingStrategy(
columnName = columnName,
columnType = columnType,
params = parameters)
case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
Expand Down Expand Up @@ -78,6 +84,14 @@ object FlintSparkIndexFactory {
}
}

private def getSkipParams(colInfo: java.util.Map[String, AnyRef]): Map[String, String] = {
colInfo
.getOrDefault("parameters", Collections.emptyMap())
.asInstanceOf[java.util.Map[String, String]]
.asScala
.toMap
}

private def getString(map: java.util.Map[String, AnyRef], key: String): String = {
map.get(key).asInstanceOf[String]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ case class FlintSparkSkippingIndex(
.map(col =>
Map[String, AnyRef](
"kind" -> col.kind.toString,
"parameters" -> col.parameters.asJava,
"columnName" -> col.columnName,
"columnType" -> col.columnType).asJava)
.toArray
Expand Down Expand Up @@ -155,14 +156,20 @@ object FlintSparkSkippingIndex {
*
* @param colName
* indexed column name
* @param params
* value set parameters
* @return
* index builder
*/
def addValueSet(colName: String): Builder = {
def addValueSet(colName: String, params: Map[String, String] = Map.empty): Builder = {
require(tableName.nonEmpty, "table name cannot be empty")

val col = findColumn(colName)
addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType))
addIndexedColumn(
ValueSetSkippingStrategy(
columnName = col.name,
columnType = col.dataType,
params = params))
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ trait FlintSparkSkippingStrategy {
*/
val columnType: String

/**
* Skipping algorithm named parameters.
*/
val parameters: Map[String, String] = Map.empty

/**
* @return
* output schema mapping from Flint field name to Flint field type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.functions._
Expand All @@ -18,14 +18,25 @@ import org.apache.spark.sql.functions._
case class ValueSetSkippingStrategy(
override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String)
override val columnType: String,
params: Map[String, String] = Map.empty)
extends FlintSparkSkippingStrategy {

override val parameters: Map[String, String] = {
val map = Map.newBuilder[String, String]
map ++= params

if (!params.contains(VALUE_SET_MAX_SIZE_KEY)) {
map += (VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString)
}
map.result()
}

override def outputSchema(): Map[String, String] =
Map(columnName -> columnType)

override def getAggregators: Seq[Expression] = {
val limit = DEFAULT_VALUE_SET_SIZE_LIMIT
val limit = parameters(VALUE_SET_MAX_SIZE_KEY).toInt
val collectSet = collect_set(columnName)
val aggregator =
when(size(collectSet) > limit, lit(null))
Expand All @@ -48,8 +59,7 @@ case class ValueSetSkippingStrategy(

object ValueSetSkippingStrategy {

/**
* Default limit for value set size collected. TODO: make this val once it's configurable
*/
var DEFAULT_VALUE_SET_SIZE_LIMIT = 100
/** Value set max size param key and default value */
var VALUE_SET_MAX_SIZE_KEY = "max_size"
var DEFAULT_VALUE_SET_MAX_SIZE = 100
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

package org.opensearch.flint.spark.sql.skipping

import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
Expand Down Expand Up @@ -43,9 +46,12 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
val colName = colTypeCtx.identifier().getText
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
val skipParams = visitSkipParams(colTypeCtx.skipParams())
skipType match {
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET => indexBuilder.addValueSet(colName)
case VALUE_SET =>
val valueSetParams = (Seq(VALUE_SET_MAX_SIZE_KEY) zip skipParams).toMap
indexBuilder.addValueSet(colName, valueSetParams)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
Expand Down Expand Up @@ -107,6 +113,16 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

override def visitSkipParams(ctx: SkipParamsContext): Seq[String] = {
if (ctx == null) {
Seq.empty
} else {
ctx.propertyValue.asScala
.map(p => visitPropertyValue(p))
.toSeq
}
}

private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String =
FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark.skipping

import java.util.Collections

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.json4s.native.JsonMethods.parse
Expand Down Expand Up @@ -39,6 +41,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
test("get index metadata") {
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.kind).thenReturn(SkippingKind.PARTITION)
when(indexCol.parameters).thenReturn(Map.empty[String, String])
when(indexCol.columnName).thenReturn("test_field")
when(indexCol.columnType).thenReturn("integer")
when(indexCol.outputSchema()).thenReturn(Map("test_field" -> "integer"))
Expand All @@ -51,6 +54,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
metadata.indexedColumns shouldBe Array(
Map(
"kind" -> SkippingKind.PARTITION.toString,
"parameters" -> Collections.emptyMap(),
"columnName" -> "test_field",
"columnType" -> "integer").asJava)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,47 @@
package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.{FlintSparkSkippingStrategy, FlintSparkSkippingStrategySuite}
import org.scalatest.matchers.should.Matchers
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY}
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{Abs, AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.functions.{col, isnull}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

class ValueSetSkippingStrategySuite
extends SparkFunSuite
with FlintSparkSkippingStrategySuite
with Matchers {
class ValueSetSkippingStrategySuite extends SparkFunSuite with FlintSparkSkippingStrategySuite {

override val strategy: FlintSparkSkippingStrategy =
ValueSetSkippingStrategy(columnName = "name", columnType = "string")

private val name = AttributeReference("name", StringType, nullable = false)()

test("should return parameters with default value") {
strategy.parameters shouldBe Map(
VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString)
}

test("should build aggregator with default parameter") {
strategy.getAggregators.head.semanticEquals(
when(size(collect_set("name")) > DEFAULT_VALUE_SET_MAX_SIZE, lit(null))
.otherwise(collect_set("name"))
.expr) shouldBe true
}

test("should use given parameter value") {
val strategy =
ValueSetSkippingStrategy(
columnName = "name",
columnType = "string",
params = Map(VALUE_SET_MAX_SIZE_KEY -> "5"))

strategy.parameters shouldBe Map(VALUE_SET_MAX_SIZE_KEY -> "5")
strategy.getAggregators.head.semanticEquals(
when(size(collect_set("name")) > 5, lit(null))
.otherwise(collect_set("name"))
.expr) shouldBe true
}

test("should rewrite EqualTo(<indexCol>, <value>)") {
EqualTo(name, Literal("hello")) shouldRewriteTo
(isnull(col("name")) || col("name") === "hello")
Expand Down
Loading

0 comments on commit dae36ec

Please sign in to comment.