Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down sort through eval #2937

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

qianheng-aws
Copy link
Contributor

@qianheng-aws qianheng-aws commented Aug 15, 2024

Description

Add a rule EvalPushDown.PUSH_DOWN_SORT, this rule will push down limit under eval. Thus, sort has chance to be pushed down into TableScanBuilder later.

For now, it only supports push specific sort and eval, which has restriction:

  1. The expression in sort and replaced expression are both ReferenceExpression.
  2. No internal reference in eval.

As described in this RFC #2864 , we may have chance to extend it.

e.g.

POST _plugins/_ppl/_explain
{
  "query": """
    source=opensearch_dashboards_sample_data_flights | eval newFlightMin = FlightTimeMin | sort newFlightMin |  fields newFlightMin 
  """
}

# Before optimization
{
  "root": {
    "name": "ProjectOperator",
    "description": {
      "fields": "[newFlightMin]"
    },
    "children": [
      {
        "name": "SortOperator",
        "description": {
          "sortList": {
            "newFlightMin": {
              "sortOrder": "ASC",
              "nullOrder": "NULL_FIRST"
            }
          }
        },
        "children": [
          {
            "name": "EvalOperator",
            "description": {
              "expressions": {
                "newFlightMin": "FlightTimeMin"
              }
            },
            "children": [
              {
                "name": "OpenSearchIndexScan",
                "description": {
                  "request": """OpenSearchQueryRequest(indexName=opensearch_dashboards_sample_data_flights, sourceBuilder={"from":0,"size":200,"timeout":"1m"}, searchDone=false)"""
                },
                "children": []
              }
            ]
          }
        ]
      }
    ]
  }
}

# After optimization
{
  "root": {
    "name": "ProjectOperator",
    "description": {
      "fields": "[newFlightMin]"
    },
    "children": [
      {
        "name": "EvalOperator",
        "description": {
          "expressions": {
            "newFlightMin": "FlightTimeMin"
          }
        },
        "children": [
          {
            "name": "OpenSearchIndexScan",
            "description": {
              "request": """OpenSearchQueryRequest(indexName=opensearch_dashboards_sample_data_flights, sourceBuilder={"from":0,"size":10000,"timeout":"1m","sort":[{"FlightTimeMin":{"order":"asc","missing":"_first"}}]}, searchDone=false)"""
            },
            "children": []
          }
        ]
      }
    ]
  }
}

Related Issues

Resolves #2904

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Heng Qian <qianheng@amazon.com>
Signed-off-by: Heng Qian <qianheng@amazon.com>
Signed-off-by: Heng Qian <qianheng@amazon.com>
Comment on lines 77 to 79
} else return sort;
} else return sort;
} else return sort;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can return sort at the end of for block once.

Copy link
Contributor Author

@qianheng-aws qianheng-aws Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we need a flag to indicate returning sort or not for each iteration.

It could be simplified by pattern matching with java version>=14. But since we also need to support java11 for 2.x, I don't use that feature in order to keep code align.

*/
if (pair.getRight() instanceof ReferenceExpression) {
ReferenceExpression ref = (ReferenceExpression) pair.getRight();
Expression replacedExpr = evalExpressionMap.getOrDefault(ref, ref);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it right to return ref as default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, it's right I think, though the name of replacedExpr may be a little bit misleading. It includes 2 cases:

  1. the ref is produced by eval operator, then it needs to be replaced
  2. the ref isn't produced by eval operator, then we don't need to replace it. That's why I use the default value ref.

How about changing the name of replacedExpr to newExpr?

public static final Rule<LogicalSort> PUSH_DOWN_SORT =
match(sort(evalCapture()))
.apply(
(sort, logicalEval) -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks this Lambda is too large. Please extract to a method or a class. And extract some portion to reduce the size of method for readability.

Comment on lines 383 to 398
// don't push sort if sort field is not ReferenceExpression
Expression nonRefExpr = DSL.add(DSL.ref("intV", INTEGER), DSL.literal(1));
Pair<SortOption, Expression> sortExprWithNonRef =
Pair.of(Sort.SortOption.DEFAULT_ASC, nonRefExpr);
LogicalPlan originPlan = sort(eval(relation("schema", table), evalExpr), sortExprWithNonRef);
assertEquals(originPlan, optimize(originPlan));

// don't push sort if replaced expr in eval is not ReferenceExpression
Pair<ReferenceExpression, Expression> evalExprWithNonRef = Pair.of(sortRef, nonRefExpr);
originPlan = sort(eval(relation("schema", table), evalExprWithNonRef), sortExpr);
assertEquals(originPlan, optimize(originPlan));

// don't push sort if there are internal reference in eval
Pair<ReferenceExpression, Expression> evalExpr2 = Pair.of(sortRef, evalRef);
originPlan = sort(eval(relation("schema", table), evalExpr, evalExpr2), sortExpr);
assertEquals(originPlan, optimize(originPlan));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split the method for each test case.

Signed-off-by: Heng Qian <qianheng@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

push down sort
2 participants