[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r224969136 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -64,7 +64,8 @@ case class InMemoryRelation( tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics) +statsOfPlanToCache: Statistics, +override val outputOrdering: Seq[SortOrder]) --- End diff -- This should be added to `otherCopyArgs `; otherwise, we will lose it when doing the tree transformation. https://github.com/apache/spark/pull/22715 fixed it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20560 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r181272920 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operation if the child is already sorted + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty --- End diff -- `child.outputOrdering.nonEmpty` looks like unnecessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r181253369 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operation if the child is already sorted + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty +&& SortOrder.orderingSatisfies(child.outputOrdering, orders) => + child + } --- End diff -- Filed SPARK-23973 for this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180716400 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -169,4 +169,6 @@ case class InMemoryRelation( override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering --- End diff -- We should carry the logical ordering from the cached logical plan when building the `InMemoryRelation` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180716173 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, + ShuffleExchangeExec} --- End diff -- it's a unnecessary change. We don't have length limit for imports --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180716049 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -169,4 +169,6 @@ case class InMemoryRelation( override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering --- End diff -- in SparkPlan ``` /** Specifies how data is ordered in each partition. */ def outputOrdering: Seq[SortOrder] = Nil ``` So we can't do this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180715118 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operation if the child is already sorted + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty +&& SortOrder.orderingSatisfies(child.outputOrdering, orders) => + child + } --- End diff -- This is a good follow-up --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180714835 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan { override final def children: Seq[LogicalPlan] = Seq(left, right) } + +abstract class KeepOrderUnaryNode extends UnaryNode { --- End diff -- OrderPreservingUnaryNode sounds better. It only makes sense for unary node, so I don't think mixin trait is a good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180664857 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operation if the child is already sorted + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty +&& SortOrder.orderingSatisfies(child.outputOrdering, orders) => + child + } --- End diff -- Yes, you're right. Probably we can do this in other PR. May you open a JIRA for this? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180664561 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan { override final def children: Seq[LogicalPlan] = Seq(left, right) } + +abstract class KeepOrderUnaryNode extends UnaryNode { --- End diff -- thanks for the suggestion. I'd love to hear also @cloud-fan's and @wzhfy's opinion on this in order to choose all together the best name for it. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180664594 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, + ShuffleExchangeExec} --- End diff -- why? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r179003707 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan { override final def children: Seq[LogicalPlan] = Seq(left, right) } + +abstract class KeepOrderUnaryNode extends UnaryNode { --- End diff -- `OrderPreservingUnaryNode`? Or perhaps do you think this would be better modeled as a mixin trait? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180601594 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operation if the child is already sorted + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty +&& SortOrder.orderingSatisfies(child.outputOrdering, orders) => + child + } --- End diff -- You might not want to do it in this PR, but you could easily remove another simple kind of redundant sort, e.g.: `rel.orderBy('a.desc).orderBy('a.asc)` (and I think that `orderBy` is not stable, so any two consecutive `orderBy` operators are redundant). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180592716 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, + ShuffleExchangeExec} --- End diff -- revert this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180397806 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -522,6 +524,8 @@ case class Range( override def computeStats(): Statistics = { Statistics(sizeInBytes = LongType.defaultSize * numElements) } + + override def outputOrdering: Seq[SortOrder] = output.map(a => SortOrder(a, Descending)) --- End diff -- Nice catch, thanks! I missed it! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180389105 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL} + +class RemoveRedundantSortsSuite extends PlanTest { + override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, ORDER_BY_ORDINAL -> false) + val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) + val analyzer = new Analyzer(catalog, conf) --- End diff -- If we don't use ordinal number, we can remove these. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180390907 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -197,6 +198,19 @@ class PlannerSuite extends SharedSQLContext { assert(planned.child.isInstanceOf[CollectLimitExec]) } + test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") { +val query = testData.select('key, 'value).sort('key.desc).cache() + assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation]) +val resorted = query.sort('key.desc) +assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty) +assert(resorted.select('key).collect().map(_.getInt(0)).toSeq == + (1 to 100).sorted(Ordering[Int].reverse)) +// with a different order, the sort is needed +val sortedAsc = query.sort('key) +assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.nonEmpty) --- End diff -- `.nonEmpty` -> `.size == 1` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180389667 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operations on already sorted data --- End diff -- how about `Removes Sort operation if the child is already sorted`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180390730 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -197,6 +198,19 @@ class PlannerSuite extends SharedSQLContext { assert(planned.child.isInstanceOf[CollectLimitExec]) } + test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") { +val query = testData.select('key, 'value).sort('key.desc).cache() + assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation]) +val resorted = query.sort('key.desc) +assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty) +assert(resorted.select('key).collect().map(_.getInt(0)).toSeq == + (1 to 100).sorted(Ordering[Int].reverse)) --- End diff -- `(1 to 100).reverse`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180390139 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -522,6 +524,8 @@ case class Range( override def computeStats(): Statistics = { Statistics(sizeInBytes = LongType.defaultSize * numElements) } + + override def outputOrdering: Seq[SortOrder] = output.map(a => SortOrder(a, Descending)) --- End diff -- ordering is the same when `step` in `Range` is positive or negative? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180389285 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL} + +class RemoveRedundantSortsSuite extends PlanTest { + override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, ORDER_BY_ORDINAL -> false) + val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) + val analyzer = new Analyzer(catalog, conf) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Remove Redundant Sorts", Once, +RemoveRedundantSorts) :: + Batch("Collapse Project", Once, +CollapseProject) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("remove redundant order by") { +val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) +val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst) +val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered)) --- End diff -- just use `unnecessaryReordered.analyze`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r179120037 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -43,11 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { * This node is inserted at the top of a subquery when it is optimized. This makes sure we can * recognize a subquery as such, and it allows us to write subquery aware transformations. */ -case class Subquery(child: LogicalPlan) extends UnaryNode { +case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode { override def output: Seq[Attribute] = child.output } -case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { +case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) --- End diff -- sorry, I don't fully understand what you mean. In `ProjectExec.outputOrdering` we are getting the `child.outputOrdering` exactly as it is done here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r179044224 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -867,6 +871,11 @@ case class RepartitionByExpression( override def maxRows: Option[Long] = child.maxRows override def shuffle: Boolean = true + + override def outputOrdering: Seq[SortOrder] = partitioning match { +case RangePartitioning(ordering, _) => ordering --- End diff -- `RangePartitioning` doesn't guarantee ordering inside partition, we can't do this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r179043920 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -43,11 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { * This node is inserted at the top of a subquery when it is optimized. This makes sure we can * recognize a subquery as such, and it allows us to write subquery aware transformations. */ -case class Subquery(child: LogicalPlan) extends UnaryNode { +case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode { override def output: Seq[Attribute] = child.output } -case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { +case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) --- End diff -- Like `ProjectExec.outputOrdering`, we can propagate ordering for aliased attributes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r179043789 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operations on already sorted data + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty +&& SortOrder.orderingSatisfies(child.outputOrdering, orders) => --- End diff -- ah they are different. This is global ordering, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r179043661 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operations on already sorted data + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty +&& SortOrder.orderingSatisfies(child.outputOrdering, orders) => --- End diff -- shall we do it after planning as we already have `SparkPlan. outputOrdering`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r178327617 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -219,6 +219,11 @@ abstract class LogicalPlan * Refreshes (or invalidates) any metadata/data cached in the plan recursively. */ def refresh(): Unit = children.foreach(_.refresh()) + + /** + * If the current plan contains sorted data, it contains the sorted order. --- End diff -- Returns the output ordering that this plan generates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r178327166 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -219,6 +219,11 @@ abstract class LogicalPlan * Refreshes (or invalidates) any metadata/data cached in the plan recursively. */ def refresh(): Unit = children.foreach(_.refresh()) + + /** + * If the current plan contains sorted data, it contains the sorted order. + */ + def sortedOrder: Seq[SortOrder] = Nil --- End diff -- `def outputOrdering`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r178328886 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operations on already sorted data + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.sortedOrder.nonEmpty +&& child.sortedOrder.zip(orders).forall { case (s1, s2) => s1.satisfies(s2) } => --- End diff -- Why not using `SortOrder.orderingSatisfies`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/20560 [SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer ## What changes were proposed in this pull request? Added a new rule to remove Sort operation when its child is already sorted. For instance, this simple code: ``` spark.sparkContext.parallelize(Seq(("a", "b"))).toDF("a", "b").registerTempTable("table1") val df = sql(s"""SELECT b | FROM ( | SELECT a, b | FROM table1 | ORDER BY a | ) t | ORDER BY a""".stripMargin) df.explain(true) ``` before the PR produces this plan: ``` == Parsed Logical Plan == 'Sort ['a ASC NULLS FIRST], true +- 'Project ['b] +- 'SubqueryAlias t +- 'Sort ['a ASC NULLS FIRST], true +- 'Project ['a, 'b] +- 'UnresolvedRelation `table1` == Analyzed Logical Plan == b: string Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- SubqueryAlias t +- Sort [a#6 ASC NULLS FIRST], true +- Project [a#6, b#7] +- SubqueryAlias table1 +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- Sort [a#6 ASC NULLS FIRST], true +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Physical Plan == *(3) Project [b#7] +- *(3) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200) +- *(2) Project [b#7, a#6] +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200) +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7] +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- Scan ExternalRDDScan[obj#2] ``` while after the PR produces: ``` == Parsed Logical Plan == 'Sort ['a ASC NULLS FIRST], true +- 'Project ['b] +- 'SubqueryAlias t +- 'Sort ['a ASC NULLS FIRST], true +- 'Project ['a, 'b] +- 'UnresolvedRelation `table1` == Analyzed Logical Plan == b: string Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- SubqueryAlias t +- Sort [a#6 ASC NULLS FIRST], true +- Project [a#6, b#7] +- SubqueryAlias table1 +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Physical Plan == *(2) Project [b#7] +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitionin