WangGuangxin commented on a change in pull request #26011: [SPARK-29343][SQL]
Eliminate sorts without limit in the subquery of Join/Aggregation
URL: https://github.com/apache/spark/pull/26011#discussion_r344989239
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -953,40 +952,62 @@ object CombineFilters extends Rule[LogicalPlan] with
PredicateHelper {
}
/**
- * Removes no-op SortOrder from Sort
+ * Removes Sort operation. This can happen:
+ * 1) if the sort order is empty or the sort order does not have any reference
+ * 2) if the child is already sorted
+ * 3) if there is another Sort operator separated by 0...n Project/Filter
operators
+ * 4) if the Sort operator is within Join separated by 0...n Project/Filter
operators only,
+ * and the Join conditions is deterministic
+ * 5) if the Sort operator is within GroupBy separated by 0...n Project/Filter
operators only,
+ * and the aggregate function is order irrelevant
*/
object EliminateSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case s @ Sort(orders, _, child) if orders.isEmpty ||
orders.exists(_.child.foldable) =>
val newOrders = orders.filterNot(_.child.foldable)
if (newOrders.isEmpty) child else s.copy(order = newOrders)
- }
-}
-
-/**
- * Removes redundant Sort operation. This can happen:
- * 1) if the child is already sorted
- * 2) if there is another Sort operator separated by 0...n Project/Filter
operators
- */
-object RemoveRedundantSorts extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case Sort(orders, true, child) if
SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
child
case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
+ case j @ Join(originLeft, originRight, _, cond, _) if
cond.forall(_.deterministic) =>
+ j.copy(left = recursiveRemoveSort(originLeft), right =
recursiveRemoveSort(originRight))
+ case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
+ g.copy(child = recursiveRemoveSort(originChild))
}
- def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match {
+ private def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match
{
case Sort(_, _, child) => recursiveRemoveSort(child)
case other if canEliminateSort(other) =>
other.withNewChildren(other.children.map(recursiveRemoveSort))
case _ => plan
}
- def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
+ private def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
case p: Project => p.projectList.forall(_.deterministic)
case f: Filter => f.condition.deterministic
case _ => false
}
+
+ private def isOrderIrrelevantAggs(aggs: Seq[NamedExpression]): Boolean = {
+ def isOrderIrrelevantAggFunction(func: AggregateFunction): Boolean = func
match {
+ case _: Sum => true
+ case _: Min => true
+ case _: Max => true
+ case _: Count => true
+ case _: Average => true
Review comment:
Sure, I'll fix it in a followup
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]