Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ppl patterns command #627

Merged
merged 5 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GreaterThan, Literal, NullsLast, RegExpExtract, RegExpReplace, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLPatternsITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createPartitionedGrokEmailTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test patterns email & host expressions") {
val frame = sql(s"""
| source = $testTable| patterns email | fields email, patterns_field
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("charlie@domain.net", "@."),
Row("david@anotherdomain.com", "@."),
Row("hank@demonstration.com", "@."),
Row("alice@example.com", "@."),
Row("frank@sample.org", "@."),
Row("grace@demo.net", "@."),
Row("jack@sample.net", "@."),
Row("eve@examples.com", "@."),
Row("ivy@examples.com", "@."),
Row("bob@test.org", "@."))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val emailAttribute = UnresolvedAttribute("email")
val patterns_field = UnresolvedAttribute("patterns_field")
val hostExpression = Alias(
RegExpReplace(emailAttribute, Literal("[a-zA-Z0-9]"), Literal("")),
"patterns_field")()
val expectedPlan = Project(
Seq(emailAttribute, patterns_field),
Project(
Seq(emailAttribute, hostExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))))
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("test patterns email expressions parsing filter & sort by age") {
val frame = sql(s"""
| source = $testTable| patterns email | where age > 45 | sort - age | fields age, email, patterns_field;
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row(76, "frank@sample.org", "@."),
Row(65, "charlie@domain.net", "@."),
Row(55, "bob@test.org", "@."))

// Compare the results
assert(results.sameElements(expectedResults))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val emailAttribute = UnresolvedAttribute("email")
val patterns_fieldAttribute = UnresolvedAttribute("patterns_field")
val ageAttribute = UnresolvedAttribute("age")
val patternExpression = Alias(
RegExpReplace(emailAttribute, Literal("[a-zA-Z0-9]"), Literal("")),
"patterns_field")()

// Define the corrected expected plan
val expectedPlan = Project(
Seq(ageAttribute, emailAttribute, patterns_fieldAttribute),
Sort(
Seq(SortOrder(ageAttribute, Descending, NullsLast, Seq.empty)),
global = true,
Filter(
GreaterThan(ageAttribute, Literal(45)),
Project(
Seq(emailAttribute, patternExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))))))
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("test patterns email expressions and top count_host ") {
val frame = sql(
"source=spark_catalog.default.flint_ppl_test | patterns new_field='dot_com' pattern='(.com|.net|.org)' email | stats count() by dot_com ")

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row(1L, "charlie@domain"),
Row(1L, "david@anotherdomain"),
Row(1L, "hank@demonstration"),
Row(1L, "alice@example"),
Row(1L, "frank@sample"),
Row(1L, "grace@demo"),
Row(1L, "jack@sample"),
Row(1L, "eve@examples"),
Row(1L, "ivy@examples"),
Row(1L, "bob@test"))

// Sort both the results and the expected results
implicit val rowOrdering: Ordering[Row] = Ordering.by(r => (r.getLong(0), r.getString(1)))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val messageAttribute = UnresolvedAttribute("email")
val noNumbersAttribute = UnresolvedAttribute("dot_com")
val hostExpression = Alias(
RegExpReplace(messageAttribute, Literal("(.com|.net|.org)"), Literal("")),
"dot_com")()

// Define the corrected expected plan
val expectedPlan = Project(
Seq(UnresolvedStar(None)), // Matches the '*' in the Project
Aggregate(
Seq(Alias(noNumbersAttribute, "dot_com")()), // Group by 'no_numbers'
Seq(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"count()")(),
Alias(noNumbersAttribute, "dot_com")()),
Project(
Seq(messageAttribute, hostExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))))

// Compare the logical plans
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}
}
6 changes: 6 additions & 0 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- `source=accounts | grok street_address '%{NUMBER} %{GREEDYDATA:address}' | fields address `
- `source=logs | grok message '%{COMMONAPACHELOG}' | fields COMMONAPACHELOG, timestamp, response, bytes`

**Patterns**
- `source=accounts | patterns email | fields email, patterns_field `
- `source=accounts | patterns email | where age > 45 | sort - age | fields email, patterns_field`
- `source=apache | patterns new_field='no_numbers' pattern='[0-9]' message | fields message, no_numbers`
- `source=apache | patterns new_field='no_numbers' pattern='[0-9]' message | stats count() by no_numbers`

_- **Limitation: Overriding existing field is unsupported:**_
- `source=accounts | grok address '%{NUMBER} %{GREEDYDATA:address}' | fields address`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,26 @@ static LogicalPlan visitParseCommand(Parse node, Expression sourceField, ParseMe
if(field instanceof AllFields) {
for (int i = 0; i < namedGroupCandidates.size(); i++) {
namedGroupNumbers.put(namedGroupCandidates.get(i),
ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i)));
ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i), arguments));
}
// in specific field case - match to the namedGroupCandidates group
} else for (int i = 0; i < namedGroupCandidates.size(); i++) {
if (((Field)field).getField().toString().equals(namedGroupCandidates.get(i))) {
namedGroupNumbers.put(namedGroupCandidates.get(i),
ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i)));
ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i), arguments));
}
}
});
//list the group numbers of these projected fields
// match the regExpExtract group identifier with its number
namedGroupNumbers.forEach((group, index) -> {
//first create the regExp
RegExpExtract regExpExtract = new RegExpExtract(sourceField,
org.apache.spark.sql.catalyst.expressions.Literal.create(cleanedPattern, StringType),
org.apache.spark.sql.catalyst.expressions.Literal.create(index + 1, IntegerType));
//first create the regExp
org.apache.spark.sql.catalyst.expressions.Literal patternLiteral = org.apache.spark.sql.catalyst.expressions.Literal.create(cleanedPattern, StringType);
org.apache.spark.sql.catalyst.expressions.Literal groupIndexLiteral = org.apache.spark.sql.catalyst.expressions.Literal.create(index + 1, IntegerType);
Expression regExp = ParseUtils.getRegExpCommand(parseMethod, sourceField, patternLiteral, groupIndexLiteral);
//next Alias the extracted fields
context.getNamedParseExpressions().push(
org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(regExpExtract,
org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(regExp,
group,
NamedExpression.newExprId(),
seq(new java.util.ArrayList<String>()),
Expand Down
Loading
Loading