diff --git a/flint-spark-integration/src/main/resources/bloom_filter_query.txt b/flint-spark-integration/src/main/resources/bloom_filter_query.script similarity index 50% rename from flint-spark-integration/src/main/resources/bloom_filter_query.txt rename to flint-spark-integration/src/main/resources/bloom_filter_query.script index 18c5a0269..949ac6d34 100644 --- a/flint-spark-integration/src/main/resources/bloom_filter_query.txt +++ b/flint-spark-integration/src/main/resources/bloom_filter_query.script @@ -1,38 +1,35 @@ int hashLong(long input, int seed) { -int low = (int) input; -int high = (int) (input >>> 32); - -int k1 = mixK1(low); -int h1 = mixH1(seed, k1); - -k1 = mixK1(high); -h1 = mixH1(h1, k1); - -return fmix(h1, 8); + int low = (int) input; + int high = (int) (input >>> 32); + int k1 = mixK1(low); + int h1 = mixH1(seed, k1); + k1 = mixK1(high); + h1 = mixH1(h1, k1); + return fmix(h1, 8); } int mixK1(int k1) { -k1 *= 0xcc9e2d51L; -k1 = Integer.rotateLeft(k1, 15); -k1 *= 0x1b873593L; -return k1; + k1 *= 0xcc9e2d51L; + k1 = Integer.rotateLeft(k1, 15); + k1 *= 0x1b873593L; + return k1; } int mixH1(int h1, int k1) { -h1 ^= k1; -h1 = Integer.rotateLeft(h1, 13); -h1 = h1 * 5 + (int) 0xe6546b64L; -return h1; + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + h1 = h1 * 5 + (int) 0xe6546b64L; + return h1; } int fmix(int h1, int length) { -h1 ^= length; -h1 ^= h1 >>> 16; -h1 *= 0x85ebca6bL; -h1 ^= h1 >>> 13; -h1 *= 0xc2b2ae35L; -h1 ^= h1 >>> 16; -return h1; + h1 ^= length; + h1 ^= h1 >>> 16; + h1 *= 0x85ebca6bL; + h1 ^= h1 >>> 13; + h1 *= 0xc2b2ae35L; + h1 ^= h1 >>> 16; + return h1; } BytesRef bfBytes = doc[params.fieldName].value; @@ -58,55 +55,54 @@ int numWords = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); long[] data = new long[numWords]; byte[] readBuffer = new byte[8]; for (int i = 0; i < numWords; i++) { - -int n = 0; -while (n < 8) { -int count2; -int off = n; -int len = 8 - n; -if (pos >= count) { -count2 = -1; -} else { -int avail = count - pos; -if (len > avail) { -len = avail; -} -if (len <= 0) { -count2 = 0; -} else { -System.arraycopy(buf, pos, readBuffer, off, len); -pos += len; -count2 = len; -} -} -n += count2; -} -data[i] = (((long) readBuffer[0] << 56) + -((long) (readBuffer[1] & 255) << 48) + -((long) (readBuffer[2] & 255) << 40) + -((long) (readBuffer[3] & 255) << 32) + -((long) (readBuffer[4] & 255) << 24) + -((readBuffer[5] & 255) << 16) + -((readBuffer[6] & 255) << 8) + -((readBuffer[7] & 255) << 0)); + int n = 0; + while (n < 8) { + int count2; + int off = n; + int len = 8 - n; + if (pos >= count) { + count2 = -1; + } else { + int avail = count - pos; + if (len > avail) { + len = avail; + } + if (len <= 0) { + count2 = 0; + } else { + System.arraycopy(buf, pos, readBuffer, off, len); + pos += len; + count2 = len; + } + } + n += count2; + } + data[i] = (((long) readBuffer[0] << 56) + + ((long) (readBuffer[1] & 255) << 48) + + ((long) (readBuffer[2] & 255) << 40) + + ((long) (readBuffer[3] & 255) << 32) + + ((long) (readBuffer[4] & 255) << 24) + + ((readBuffer[5] & 255) << 16) + + ((readBuffer[6] & 255) << 8) + + ((readBuffer[7] & 255) << 0)); } + long bitCount = 0; for (long word : data) { -bitCount += Long.bitCount(word); + bitCount += Long.bitCount(word); } long item = params.value; int h1 = hashLong(item, 0); int h2 = hashLong(item, h1); - long bitSize = (long) data.length * Long.SIZE; for (int i = 1; i <= numHashFunctions; i++) { -int combinedHash = h1 + (i * h2); -if (combinedHash < 0) { -combinedHash = ~combinedHash; -} -if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) { -return false; -} + int combinedHash = h1 + (i * h2); + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) { + return false; + } } return true; \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala index 5ddf748a5..7e07148f2 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/storage/FlintQueryCompiler.scala @@ -5,6 +5,8 @@ package org.apache.spark.sql.flint.storage +import scala.io.Source + import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, LiteralValue} @@ -13,8 +15,6 @@ import org.apache.spark.sql.flint.datatype.FlintDataType.STRICT_DATE_OPTIONAL_TI import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import scala.io.Source - /** * Todo. find the right package. */ @@ -115,23 +115,23 @@ case class FlintQueryCompiler(schema: StructType) { p.children()(1), false)}"}}}""" case "BLOOM_FILTER_MIGHT_CONTAIN" => - val code = Source.fromResource("bloom_filter_query.txt").getLines().mkString(" ") + val code = Source.fromResource("bloom_filter_query.script").getLines().mkString(" ") s""" |{ - | "bool": { - | "filter": { + | "bool": { + | "filter": { + | "script": { | "script": { - | "script": { - | "lang": "painless", - | "source": "$code", - | "params": { - | "fieldName": "${compile(p.children()(0))}", - | "value": ${compile(p.children()(1))} - | } + | "lang": "painless", + | "source": "$code", + | "params": { + | "fieldName": "${compile(p.children()(0))}", + | "value": ${compile(p.children()(1))} | } | } | } | } + | } |} |""".stripMargin case _ => "" diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala index cab98a49d..e8c6dac17 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/storage/FlintQueryCompilerSuite.scala @@ -139,30 +139,30 @@ class FlintQueryCompilerSuite extends FlintSuite { assertResult("""{"exists":{"field":"aString"}}""")(query) } - test("compile bloom_filter_might_contain(aInt, 1) successfully") { + test("compile BLOOM_FILTER_MIGHT_CONTAIN(aInt, 1) successfully") { val query = FlintQueryCompiler(schema()).compile( new Predicate( "BLOOM_FILTER_MIGHT_CONTAIN", Array(FieldReference("aInt"), LiteralValue(1, IntegerType)))) - val code = Source.fromResource("bloom_filter_query.txt").getLines().mkString(" ") + val code = Source.fromResource("bloom_filter_query.script").getLines().mkString(" ") assertResult(s""" |{ - | "bool": { - | "filter": { + | "bool": { + | "filter": { + | "script": { | "script": { - | "script": { - | "lang": "painless", - | "source": "$code", - | "params": { - | "fieldName": "aInt", - | "value": 1 - | } + | "lang": "painless", + | "source": "$code", + | "params": { + | "fieldName": "aInt", + | "value": 1 | } | } | } | } + | } |} |""".stripMargin)(query) }