Skip to content

Commit

Permalink
Update UT and indent script code
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Mar 7, 2024
1 parent 72a4f32 commit 91a0ece
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
int hashLong(long input, int seed) {
int low = (int) input;
int high = (int) (input >>> 32);

int k1 = mixK1(low);
int h1 = mixH1(seed, k1);

k1 = mixK1(high);
h1 = mixH1(h1, k1);

return fmix(h1, 8);
int low = (int) input;
int high = (int) (input >>> 32);
int k1 = mixK1(low);
int h1 = mixH1(seed, k1);
k1 = mixK1(high);
h1 = mixH1(h1, k1);
return fmix(h1, 8);
}

int mixK1(int k1) {
k1 *= 0xcc9e2d51L;
k1 = Integer.rotateLeft(k1, 15);
k1 *= 0x1b873593L;
return k1;
k1 *= 0xcc9e2d51L;
k1 = Integer.rotateLeft(k1, 15);
k1 *= 0x1b873593L;
return k1;
}

int mixH1(int h1, int k1) {
h1 ^= k1;
h1 = Integer.rotateLeft(h1, 13);
h1 = h1 * 5 + (int) 0xe6546b64L;
return h1;
h1 ^= k1;
h1 = Integer.rotateLeft(h1, 13);
h1 = h1 * 5 + (int) 0xe6546b64L;
return h1;
}

int fmix(int h1, int length) {
h1 ^= length;
h1 ^= h1 >>> 16;
h1 *= 0x85ebca6bL;
h1 ^= h1 >>> 13;
h1 *= 0xc2b2ae35L;
h1 ^= h1 >>> 16;
return h1;
h1 ^= length;
h1 ^= h1 >>> 16;
h1 *= 0x85ebca6bL;
h1 ^= h1 >>> 13;
h1 *= 0xc2b2ae35L;
h1 ^= h1 >>> 16;
return h1;
}

BytesRef bfBytes = doc[params.fieldName].value;
Expand All @@ -58,55 +55,54 @@ int numWords = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
long[] data = new long[numWords];
byte[] readBuffer = new byte[8];
for (int i = 0; i < numWords; i++) {

int n = 0;
while (n < 8) {
int count2;
int off = n;
int len = 8 - n;
if (pos >= count) {
count2 = -1;
} else {
int avail = count - pos;
if (len > avail) {
len = avail;
}
if (len <= 0) {
count2 = 0;
} else {
System.arraycopy(buf, pos, readBuffer, off, len);
pos += len;
count2 = len;
}
}
n += count2;
}
data[i] = (((long) readBuffer[0] << 56) +
((long) (readBuffer[1] & 255) << 48) +
((long) (readBuffer[2] & 255) << 40) +
((long) (readBuffer[3] & 255) << 32) +
((long) (readBuffer[4] & 255) << 24) +
((readBuffer[5] & 255) << 16) +
((readBuffer[6] & 255) << 8) +
((readBuffer[7] & 255) << 0));
int n = 0;
while (n < 8) {
int count2;
int off = n;
int len = 8 - n;
if (pos >= count) {
count2 = -1;
} else {
int avail = count - pos;
if (len > avail) {
len = avail;
}
if (len <= 0) {
count2 = 0;
} else {
System.arraycopy(buf, pos, readBuffer, off, len);
pos += len;
count2 = len;
}
}
n += count2;
}
data[i] = (((long) readBuffer[0] << 56) +
((long) (readBuffer[1] & 255) << 48) +
((long) (readBuffer[2] & 255) << 40) +
((long) (readBuffer[3] & 255) << 32) +
((long) (readBuffer[4] & 255) << 24) +
((readBuffer[5] & 255) << 16) +
((readBuffer[6] & 255) << 8) +
((readBuffer[7] & 255) << 0));
}

long bitCount = 0;
for (long word : data) {
bitCount += Long.bitCount(word);
bitCount += Long.bitCount(word);
}

long item = params.value;
int h1 = hashLong(item, 0);
int h2 = hashLong(item, h1);

long bitSize = (long) data.length * Long.SIZE;
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = h1 + (i * h2);
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) {
return false;
}
int combinedHash = h1 + (i * h2);
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) {
return false;
}
}
return true;
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.apache.spark.sql.flint.storage

import scala.io.Source

import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, LiteralValue}
Expand All @@ -13,8 +15,6 @@ import org.apache.spark.sql.flint.datatype.FlintDataType.STRICT_DATE_OPTIONAL_TI
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import scala.io.Source

/**
* Todo. find the right package.
*/
Expand Down Expand Up @@ -115,23 +115,23 @@ case class FlintQueryCompiler(schema: StructType) {
p.children()(1),
false)}"}}}"""
case "BLOOM_FILTER_MIGHT_CONTAIN" =>
val code = Source.fromResource("bloom_filter_query.txt").getLines().mkString(" ")
val code = Source.fromResource("bloom_filter_query.script").getLines().mkString(" ")
s"""
|{
| "bool": {
| "filter": {
| "bool": {
| "filter": {
| "script": {
| "script": {
| "script": {
| "lang": "painless",
| "source": "$code",
| "params": {
| "fieldName": "${compile(p.children()(0))}",
| "value": ${compile(p.children()(1))}
| }
| "lang": "painless",
| "source": "$code",
| "params": {
| "fieldName": "${compile(p.children()(0))}",
| "value": ${compile(p.children()(1))}
| }
| }
| }
| }
| }
|}
|""".stripMargin
case _ => ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,30 +139,30 @@ class FlintQueryCompilerSuite extends FlintSuite {
assertResult("""{"exists":{"field":"aString"}}""")(query)
}

test("compile bloom_filter_might_contain(aInt, 1) successfully") {
test("compile BLOOM_FILTER_MIGHT_CONTAIN(aInt, 1) successfully") {
val query =
FlintQueryCompiler(schema()).compile(
new Predicate(
"BLOOM_FILTER_MIGHT_CONTAIN",
Array(FieldReference("aInt"), LiteralValue(1, IntegerType))))

val code = Source.fromResource("bloom_filter_query.txt").getLines().mkString(" ")
val code = Source.fromResource("bloom_filter_query.script").getLines().mkString(" ")
assertResult(s"""
|{
| "bool": {
| "filter": {
| "bool": {
| "filter": {
| "script": {
| "script": {
| "script": {
| "lang": "painless",
| "source": "$code",
| "params": {
| "fieldName": "aInt",
| "value": 1
| }
| "lang": "painless",
| "source": "$code",
| "params": {
| "fieldName": "aInt",
| "value": 1
| }
| }
| }
| }
| }
|}
|""".stripMargin)(query)
}
Expand Down

0 comments on commit 91a0ece

Please sign in to comment.