From 72a4f3214b9f097941998b508d340e41c3757698 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 29 Feb 2024 15:36:59 -0800 Subject: [PATCH 1/4] Add pushdown optimization by painless script Signed-off-by: Chen Dai --- .../src/main/resources/bloom_filter_query.txt | 112 ++++++++++++++++++ .../sql/flint/datatype/FlintDataType.scala | 7 +- .../flint/storage/FlintQueryCompiler.scala | 22 ++++ .../flint/datatype/FlintDataTypeSuite.scala | 3 +- .../storage/FlintQueryCompilerSuite.scala | 32 ++++- 5 files changed, 173 insertions(+), 3 deletions(-) create mode 100644 flint-spark-integration/src/main/resources/bloom_filter_query.txt diff --git a/flint-spark-integration/src/main/resources/bloom_filter_query.txt b/flint-spark-integration/src/main/resources/bloom_filter_query.txt new file mode 100644 index 000000000..18c5a0269 --- /dev/null +++ b/flint-spark-integration/src/main/resources/bloom_filter_query.txt @@ -0,0 +1,112 @@ +int hashLong(long input, int seed) { +int low = (int) input; +int high = (int) (input >>> 32); + +int k1 = mixK1(low); +int h1 = mixH1(seed, k1); + +k1 = mixK1(high); +h1 = mixH1(h1, k1); + +return fmix(h1, 8); +} + +int mixK1(int k1) { +k1 *= 0xcc9e2d51L; +k1 = Integer.rotateLeft(k1, 15); +k1 *= 0x1b873593L; +return k1; +} + +int mixH1(int h1, int k1) { +h1 ^= k1; +h1 = Integer.rotateLeft(h1, 13); +h1 = h1 * 5 + (int) 0xe6546b64L; +return h1; +} + +int fmix(int h1, int length) { +h1 ^= length; +h1 ^= h1 >>> 16; +h1 *= 0x85ebca6bL; +h1 ^= h1 >>> 13; +h1 *= 0xc2b2ae35L; +h1 ^= h1 >>> 16; +return h1; +} + +BytesRef bfBytes = doc[params.fieldName].value; +byte[] buf = bfBytes.bytes; +int pos = 0; +int count = buf.length; +int ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +int ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +int ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +int ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +int version = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); +ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +int numHashFunctions = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); +ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1; +int numWords = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + +long[] data = new long[numWords]; +byte[] readBuffer = new byte[8]; +for (int i = 0; i < numWords; i++) { + +int n = 0; +while (n < 8) { +int count2; +int off = n; +int len = 8 - n; +if (pos >= count) { +count2 = -1; +} else { +int avail = count - pos; +if (len > avail) { +len = avail; +} +if (len <= 0) { +count2 = 0; +} else { +System.arraycopy(buf, pos, readBuffer, off, len); +pos += len; +count2 = len; +} +} +n += count2; +} +data[i] = (((long) readBuffer[0] << 56) + +((long) (readBuffer[1] & 255) << 48) + +((long) (readBuffer[2] & 255) << 40) + +((long) (readBuffer[3] & 255) << 32) + +((long) (readBuffer[4] & 255) << 24) + +((readBuffer[5] & 255) << 16) + +((readBuffer[6] & 255) << 8) + +((readBuffer[7] & 255) << 0)); +} +long bitCount = 0; +for (long word : data) { +bitCount += Long.bitCount(word); +} + +long item = params.value; +int h1 = hashLong(item, 0); +int h2 = hashLong(item, h1); + +long bitSize = (long) data.length * Long.SIZE; +for (int i = 1; i <= numHashFunctions; i++) { +int combinedHash = h1 + (i * h2); +if (combinedHash < 0) { +combinedHash = ~combinedHash; +} +if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) { +return false; +} +} +return true; \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index 95a2666bd..0db0f6da6 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -7,6 +7,7 @@ package org.apache.spark.sql.flint.datatype import org.json4s.{Formats, JField, JValue, NoTypeHints} import org.json4s.JsonAST.{JNothing, JObject, JString} +import org.json4s.JsonAST.JBool.True import org.json4s.jackson.JsonMethods import org.json4s.native.Serialization @@ -156,7 +157,11 @@ object FlintDataType { case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty) // binary - case BinaryType => JObject("type" -> JString("binary")) + case BinaryType => + JObject( + "type" -> JString("binary"), + "doc_values" -> True // enable doc value required by painless script filtering + ) case unknown => throw new IllegalStateException(s"unsupported data type: ${unknown.sql}") } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala index b3677f38f..5ddf748a5 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala @@ -13,6 +13,8 @@ import org.apache.spark.sql.flint.datatype.FlintDataType.STRICT_DATE_OPTIONAL_TI import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import scala.io.Source + /** * Todo. find the right package. */ @@ -112,6 +114,26 @@ case class FlintQueryCompiler(schema: StructType) { s"""{"wildcard":{"${compile(p.children()(0))}":{"value":"*${compile( p.children()(1), false)}"}}}""" + case "BLOOM_FILTER_MIGHT_CONTAIN" => + val code = Source.fromResource("bloom_filter_query.txt").getLines().mkString(" ") + s""" + |{ + | "bool": { + | "filter": { + | "script": { + | "script": { + | "lang": "painless", + | "source": "$code", + | "params": { + | "fieldName": "${compile(p.children()(0))}", + | "value": ${compile(p.children()(1))} + | } + | } + | } + | } + | } + |} + |""".stripMargin case _ => "" } } diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index e2bde6b98..9cf9c553f 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -44,7 +44,8 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { | "type": "text" | }, | "binaryField": { - | "type": "binary" + | "type": "binary", + | "doc_values": true | } | } |}""".stripMargin diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala index 8411f9cc1..cab98a49d 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala @@ -5,8 +5,10 @@ package org.apache.spark.sql.flint.storage +import scala.io.Source + import org.apache.spark.FlintSuite -import org.apache.spark.sql.connector.expressions.{FieldReference, GeneralScalarExpression} +import org.apache.spark.sql.connector.expressions.{FieldReference, GeneralScalarExpression, LiteralValue} import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -137,6 +139,34 @@ class FlintQueryCompilerSuite extends FlintSuite { assertResult("""{"exists":{"field":"aString"}}""")(query) } + test("compile bloom_filter_might_contain(aInt, 1) successfully") { + val query = + FlintQueryCompiler(schema()).compile( + new Predicate( + "BLOOM_FILTER_MIGHT_CONTAIN", + Array(FieldReference("aInt"), LiteralValue(1, IntegerType)))) + + val code = Source.fromResource("bloom_filter_query.txt").getLines().mkString(" ") + assertResult(s""" + |{ + | "bool": { + | "filter": { + | "script": { + | "script": { + | "lang": "painless", + | "source": "$code", + | "params": { + | "fieldName": "aInt", + | "value": 1 + | } + | } + | } + | } + | } + |} + |""".stripMargin)(query) + } + protected def schema(): StructType = { StructType( Seq( From 91a0ecee856eb0bc6ffe8e06f1fbe47f74cad945 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 5 Mar 2024 10:22:01 -0800 Subject: [PATCH 2/4] Update UT and indent script code Signed-off-by: Chen Dai --- ...er_query.txt => bloom_filter_query.script} | 126 +++++++++--------- .../flint/storage/FlintQueryCompiler.scala | 24 ++-- .../storage/FlintQueryCompilerSuite.scala | 22 +-- 3 files changed, 84 insertions(+), 88 deletions(-) rename flint-spark-integration/src/main/resources/{bloom_filter_query.txt => bloom_filter_query.script} (50%) diff --git a/flint-spark-integration/src/main/resources/bloom_filter_query.txt b/flint-spark-integration/src/main/resources/bloom_filter_query.script similarity index 50% rename from flint-spark-integration/src/main/resources/bloom_filter_query.txt rename to flint-spark-integration/src/main/resources/bloom_filter_query.script index 18c5a0269..949ac6d34 100644 --- a/flint-spark-integration/src/main/resources/bloom_filter_query.txt +++ b/flint-spark-integration/src/main/resources/bloom_filter_query.script @@ -1,38 +1,35 @@ int hashLong(long input, int seed) { -int low = (int) input; -int high = (int) (input >>> 32); - -int k1 = mixK1(low); -int h1 = mixH1(seed, k1); - -k1 = mixK1(high); -h1 = mixH1(h1, k1); - -return fmix(h1, 8); + int low = (int) input; + int high = (int) (input >>> 32); + int k1 = mixK1(low); + int h1 = mixH1(seed, k1); + k1 = mixK1(high); + h1 = mixH1(h1, k1); + return fmix(h1, 8); } int mixK1(int k1) { -k1 *= 0xcc9e2d51L; -k1 = Integer.rotateLeft(k1, 15); -k1 *= 0x1b873593L; -return k1; + k1 *= 0xcc9e2d51L; + k1 = Integer.rotateLeft(k1, 15); + k1 *= 0x1b873593L; + return k1; } int mixH1(int h1, int k1) { -h1 ^= k1; -h1 = Integer.rotateLeft(h1, 13); -h1 = h1 * 5 + (int) 0xe6546b64L; -return h1; + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + h1 = h1 * 5 + (int) 0xe6546b64L; + return h1; } int fmix(int h1, int length) { -h1 ^= length; -h1 ^= h1 >>> 16; -h1 *= 0x85ebca6bL; -h1 ^= h1 >>> 13; -h1 *= 0xc2b2ae35L; -h1 ^= h1 >>> 16; -return h1; + h1 ^= length; + h1 ^= h1 >>> 16; + h1 *= 0x85ebca6bL; + h1 ^= h1 >>> 13; + h1 *= 0xc2b2ae35L; + h1 ^= h1 >>> 16; + return h1; } BytesRef bfBytes = doc[params.fieldName].value; @@ -58,55 +55,54 @@ int numWords = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); long[] data = new long[numWords]; byte[] readBuffer = new byte[8]; for (int i = 0; i < numWords; i++) { - -int n = 0; -while (n < 8) { -int count2; -int off = n; -int len = 8 - n; -if (pos >= count) { -count2 = -1; -} else { -int avail = count - pos; -if (len > avail) { -len = avail; -} -if (len <= 0) { -count2 = 0; -} else { -System.arraycopy(buf, pos, readBuffer, off, len); -pos += len; -count2 = len; -} -} -n += count2; -} -data[i] = (((long) readBuffer[0] << 56) + -((long) (readBuffer[1] & 255) << 48) + -((long) (readBuffer[2] & 255) << 40) + -((long) (readBuffer[3] & 255) << 32) + -((long) (readBuffer[4] & 255) << 24) + -((readBuffer[5] & 255) << 16) + -((readBuffer[6] & 255) << 8) + -((readBuffer[7] & 255) << 0)); + int n = 0; + while (n < 8) { + int count2; + int off = n; + int len = 8 - n; + if (pos >= count) { + count2 = -1; + } else { + int avail = count - pos; + if (len > avail) { + len = avail; + } + if (len <= 0) { + count2 = 0; + } else { + System.arraycopy(buf, pos, readBuffer, off, len); + pos += len; + count2 = len; + } + } + n += count2; + } + data[i] = (((long) readBuffer[0] << 56) + + ((long) (readBuffer[1] & 255) << 48) + + ((long) (readBuffer[2] & 255) << 40) + + ((long) (readBuffer[3] & 255) << 32) + + ((long) (readBuffer[4] & 255) << 24) + + ((readBuffer[5] & 255) << 16) + + ((readBuffer[6] & 255) << 8) + + ((readBuffer[7] & 255) << 0)); } + long bitCount = 0; for (long word : data) { -bitCount += Long.bitCount(word); + bitCount += Long.bitCount(word); } long item = params.value; int h1 = hashLong(item, 0); int h2 = hashLong(item, h1); - long bitSize = (long) data.length * Long.SIZE; for (int i = 1; i <= numHashFunctions; i++) { -int combinedHash = h1 + (i * h2); -if (combinedHash < 0) { -combinedHash = ~combinedHash; -} -if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) { -return false; -} + int combinedHash = h1 + (i * h2); + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) { + return false; + } } return true; \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala index 5ddf748a5..7e07148f2 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala @@ -5,6 +5,8 @@ package org.apache.spark.sql.flint.storage +import scala.io.Source + import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, LiteralValue} @@ -13,8 +15,6 @@ import org.apache.spark.sql.flint.datatype.FlintDataType.STRICT_DATE_OPTIONAL_TI import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import scala.io.Source - /** * Todo. find the right package. */ @@ -115,23 +115,23 @@ case class FlintQueryCompiler(schema: StructType) { p.children()(1), false)}"}}}""" case "BLOOM_FILTER_MIGHT_CONTAIN" => - val code = Source.fromResource("bloom_filter_query.txt").getLines().mkString(" ") + val code = Source.fromResource("bloom_filter_query.script").getLines().mkString(" ") s""" |{ - | "bool": { - | "filter": { + | "bool": { + | "filter": { + | "script": { | "script": { - | "script": { - | "lang": "painless", - | "source": "$code", - | "params": { - | "fieldName": "${compile(p.children()(0))}", - | "value": ${compile(p.children()(1))} - | } + | "lang": "painless", + | "source": "$code", + | "params": { + | "fieldName": "${compile(p.children()(0))}", + | "value": ${compile(p.children()(1))} | } | } | } | } + | } |} |""".stripMargin case _ => "" diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala index cab98a49d..e8c6dac17 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala @@ -139,30 +139,30 @@ class FlintQueryCompilerSuite extends FlintSuite { assertResult("""{"exists":{"field":"aString"}}""")(query) } - test("compile bloom_filter_might_contain(aInt, 1) successfully") { + test("compile BLOOM_FILTER_MIGHT_CONTAIN(aInt, 1) successfully") { val query = FlintQueryCompiler(schema()).compile( new Predicate( "BLOOM_FILTER_MIGHT_CONTAIN", Array(FieldReference("aInt"), LiteralValue(1, IntegerType)))) - val code = Source.fromResource("bloom_filter_query.txt").getLines().mkString(" ") + val code = Source.fromResource("bloom_filter_query.script").getLines().mkString(" ") assertResult(s""" |{ - | "bool": { - | "filter": { + | "bool": { + | "filter": { + | "script": { | "script": { - | "script": { - | "lang": "painless", - | "source": "$code", - | "params": { - | "fieldName": "aInt", - | "value": 1 - | } + | "lang": "painless", + | "source": "$code", + | "params": { + | "fieldName": "aInt", + | "value": 1 | } | } | } | } + | } |} |""".stripMargin)(query) } From 9888af8e9cfc11f1bf279b1661cc8d6dc20dfe89 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 5 Mar 2024 11:36:09 -0800 Subject: [PATCH 3/4] Minor refactor painless script Signed-off-by: Chen Dai --- .../src/main/resources/bloom_filter_query.script | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/resources/bloom_filter_query.script b/flint-spark-integration/src/main/resources/bloom_filter_query.script index 949ac6d34..4db2090f9 100644 --- a/flint-spark-integration/src/main/resources/bloom_filter_query.script +++ b/flint-spark-integration/src/main/resources/bloom_filter_query.script @@ -57,25 +57,25 @@ byte[] readBuffer = new byte[8]; for (int i = 0; i < numWords; i++) { int n = 0; while (n < 8) { - int count2; + int readCount; int off = n; int len = 8 - n; if (pos >= count) { - count2 = -1; + readCount = -1; } else { int avail = count - pos; if (len > avail) { len = avail; } if (len <= 0) { - count2 = 0; + readCount = 0; } else { System.arraycopy(buf, pos, readBuffer, off, len); pos += len; - count2 = len; + readCount = len; } } - n += count2; + n += readCount; } data[i] = (((long) readBuffer[0] << 56) + ((long) (readBuffer[1] & 255) << 48) + @@ -101,7 +101,8 @@ for (int i = 1; i <= numHashFunctions; i++) { if (combinedHash < 0) { combinedHash = ~combinedHash; } - if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) { + long index = combinedHash % bitSize; + if ((data[(int) (index >>> 6)] & (1L << index)) == 0) { return false; } } From 0d60837962fd717f606bb8544c03996696efc042 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 7 Mar 2024 13:22:17 -0800 Subject: [PATCH 4/4] Fix broken IT Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkSkippingIndexITSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 43afc3a55..b3cbe4cae 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -121,7 +121,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "type": "integer" | }, | "name" : { - | "type": "binary" + | "type": "binary", + | "doc_values": true | }, | "file_path": { | "type": "keyword"