Skip to content

Commit

Permalink
Ppl patterns command (#627)
Browse files Browse the repository at this point in the history
* add patterns support & tests

Signed-off-by: YANGDB <yang.db.dev@gmail.com>

* update tests

Signed-off-by: YANGDB <yang.db.dev@gmail.com>

* remove unrelated Dockerfile

Signed-off-by: YANGDB <yang.db.dev@gmail.com>

* sbt format

Signed-off-by: YANGDB <yang.db.dev@gmail.com>

* fix ParseUtils and simplify different pase expressions according to PR comments feedback

Signed-off-by: YANGDB <yang.db.dev@gmail.com>

---------

Signed-off-by: YANGDB <yang.db.dev@gmail.com>
  • Loading branch information
YANG-DB committed Sep 10, 2024
1 parent 7d8f2b0 commit fd3f82f
Show file tree
Hide file tree
Showing 5 changed files with 416 additions and 132 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GreaterThan, Literal, NullsLast, RegExpExtract, RegExpReplace, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLPatternsITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createPartitionedGrokEmailTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test patterns email & host expressions") {
val frame = sql(s"""
| source = $testTable| patterns email | fields email, patterns_field
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("charlie@domain.net", "@."),
Row("david@anotherdomain.com", "@."),
Row("hank@demonstration.com", "@."),
Row("alice@example.com", "@."),
Row("frank@sample.org", "@."),
Row("grace@demo.net", "@."),
Row("jack@sample.net", "@."),
Row("eve@examples.com", "@."),
Row("ivy@examples.com", "@."),
Row("bob@test.org", "@."))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val emailAttribute = UnresolvedAttribute("email")
val patterns_field = UnresolvedAttribute("patterns_field")
val hostExpression = Alias(
RegExpReplace(emailAttribute, Literal("[a-zA-Z0-9]"), Literal("")),
"patterns_field")()
val expectedPlan = Project(
Seq(emailAttribute, patterns_field),
Project(
Seq(emailAttribute, hostExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))))
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("test patterns email expressions parsing filter & sort by age") {
val frame = sql(s"""
| source = $testTable| patterns email | where age > 45 | sort - age | fields age, email, patterns_field;
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row(76, "frank@sample.org", "@."),
Row(65, "charlie@domain.net", "@."),
Row(55, "bob@test.org", "@."))

// Compare the results
assert(results.sameElements(expectedResults))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val emailAttribute = UnresolvedAttribute("email")
val patterns_fieldAttribute = UnresolvedAttribute("patterns_field")
val ageAttribute = UnresolvedAttribute("age")
val patternExpression = Alias(
RegExpReplace(emailAttribute, Literal("[a-zA-Z0-9]"), Literal("")),
"patterns_field")()

// Define the corrected expected plan
val expectedPlan = Project(
Seq(ageAttribute, emailAttribute, patterns_fieldAttribute),
Sort(
Seq(SortOrder(ageAttribute, Descending, NullsLast, Seq.empty)),
global = true,
Filter(
GreaterThan(ageAttribute, Literal(45)),
Project(
Seq(emailAttribute, patternExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))))))
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("test patterns email expressions and top count_host ") {
val frame = sql(
"source=spark_catalog.default.flint_ppl_test | patterns new_field='dot_com' pattern='(.com|.net|.org)' email | stats count() by dot_com ")

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row(1L, "charlie@domain"),
Row(1L, "david@anotherdomain"),
Row(1L, "hank@demonstration"),
Row(1L, "alice@example"),
Row(1L, "frank@sample"),
Row(1L, "grace@demo"),
Row(1L, "jack@sample"),
Row(1L, "eve@examples"),
Row(1L, "ivy@examples"),
Row(1L, "bob@test"))

// Sort both the results and the expected results
implicit val rowOrdering: Ordering[Row] = Ordering.by(r => (r.getLong(0), r.getString(1)))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val messageAttribute = UnresolvedAttribute("email")
val noNumbersAttribute = UnresolvedAttribute("dot_com")
val hostExpression = Alias(
RegExpReplace(messageAttribute, Literal("(.com|.net|.org)"), Literal("")),
"dot_com")()

// Define the corrected expected plan
val expectedPlan = Project(
Seq(UnresolvedStar(None)), // Matches the '*' in the Project
Aggregate(
Seq(Alias(noNumbersAttribute, "dot_com")()), // Group by 'no_numbers'
Seq(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"count()")(),
Alias(noNumbersAttribute, "dot_com")()),
Project(
Seq(messageAttribute, hostExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))))

// Compare the logical plans
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}
}
6 changes: 6 additions & 0 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- `source=accounts | grok street_address '%{NUMBER} %{GREEDYDATA:address}' | fields address `
- `source=logs | grok message '%{COMMONAPACHELOG}' | fields COMMONAPACHELOG, timestamp, response, bytes`

**Patterns**
- `source=accounts | patterns email | fields email, patterns_field `
- `source=accounts | patterns email | where age > 45 | sort - age | fields email, patterns_field`
- `source=apache | patterns new_field='no_numbers' pattern='[0-9]' message | fields message, no_numbers`
- `source=apache | patterns new_field='no_numbers' pattern='[0-9]' message | stats count() by no_numbers`

_- **Limitation: Overriding existing field is unsupported:**_
- `source=accounts | grok address '%{NUMBER} %{GREEDYDATA:address}' | fields address`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,26 @@ static LogicalPlan visitParseCommand(Parse node, Expression sourceField, ParseMe
if(field instanceof AllFields) {
for (int i = 0; i < namedGroupCandidates.size(); i++) {
namedGroupNumbers.put(namedGroupCandidates.get(i),
ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i)));
ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i), arguments));
}
// in specific field case - match to the namedGroupCandidates group
} else for (int i = 0; i < namedGroupCandidates.size(); i++) {
if (((Field)field).getField().toString().equals(namedGroupCandidates.get(i))) {
namedGroupNumbers.put(namedGroupCandidates.get(i),
ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i)));
ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i), arguments));
}
}
});
//list the group numbers of these projected fields
// match the regExpExtract group identifier with its number
namedGroupNumbers.forEach((group, index) -> {
//first create the regExp
RegExpExtract regExpExtract = new RegExpExtract(sourceField,
org.apache.spark.sql.catalyst.expressions.Literal.create(cleanedPattern, StringType),
org.apache.spark.sql.catalyst.expressions.Literal.create(index + 1, IntegerType));
//first create the regExp
org.apache.spark.sql.catalyst.expressions.Literal patternLiteral = org.apache.spark.sql.catalyst.expressions.Literal.create(cleanedPattern, StringType);
org.apache.spark.sql.catalyst.expressions.Literal groupIndexLiteral = org.apache.spark.sql.catalyst.expressions.Literal.create(index + 1, IntegerType);
Expression regExp = ParseUtils.getRegExpCommand(parseMethod, sourceField, patternLiteral, groupIndexLiteral);
//next Alias the extracted fields
context.getNamedParseExpressions().push(
org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(regExpExtract,
org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(regExp,
group,
NamedExpression.newExprId(),
seq(new java.util.ArrayList<String>()),
Expand Down
Loading

0 comments on commit fd3f82f

Please sign in to comment.