[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-10-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r224969136
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -64,7 +64,8 @@ case class InMemoryRelation(
 tableName: Option[String])(
 @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
 val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics)
+statsOfPlanToCache: Statistics,
+override val outputOrdering: Seq[SortOrder])
--- End diff --

This should be added to `otherCopyArgs `; otherwise, we will lose it when 
doing the tree transformation. https://github.com/apache/spark/pull/22715 fixed 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20560


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r181272920
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operation if the child is already sorted
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
--- End diff --

`child.outputOrdering.nonEmpty` looks like unnecessary


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-12 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r181253369
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operation if the child is already sorted
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
+&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
+  child
+  }
--- End diff --

Filed SPARK-23973 for this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180716400
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -169,4 +169,6 @@ case class InMemoryRelation(
 
   override protected def otherCopyArgs: Seq[AnyRef] =
 Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache)
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
--- End diff --

We should carry the logical ordering from the cached logical plan when 
building the `InMemoryRelation`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180716173
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, 
LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Repartition, Sort}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange,
+  ShuffleExchangeExec}
--- End diff --

it's a unnecessary change. We don't have length limit for imports


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180716049
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -169,4 +169,6 @@ case class InMemoryRelation(
 
   override protected def otherCopyArgs: Seq[AnyRef] =
 Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache)
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
--- End diff --

in SparkPlan
```
/** Specifies how data is ordered in each partition. */
def outputOrdering: Seq[SortOrder] = Nil
```

So we can't do this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180715118
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operation if the child is already sorted
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
+&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
+  child
+  }
--- End diff --

This is a good follow-up


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180714835
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 ---
@@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan {
 
   override final def children: Seq[LogicalPlan] = Seq(left, right)
 }
+
+abstract class KeepOrderUnaryNode extends UnaryNode {
--- End diff --

OrderPreservingUnaryNode sounds better.

It only makes sense for unary node, so I don't think mixin trait is a good 
idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180664857
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operation if the child is already sorted
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
+&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
+  child
+  }
--- End diff --

Yes, you're right. Probably we can do this in other PR. May you open a JIRA 
for this? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180664561
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 ---
@@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan {
 
   override final def children: Seq[LogicalPlan] = Seq(left, right)
 }
+
+abstract class KeepOrderUnaryNode extends UnaryNode {
--- End diff --

thanks for the suggestion. I'd love to hear also @cloud-fan's and @wzhfy's 
opinion on this in order to choose all together the best name for it. What do 
you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180664594
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, 
LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Repartition, Sort}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange,
+  ShuffleExchangeExec}
--- End diff --

why?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r179003707
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 ---
@@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan {
 
   override final def children: Seq[LogicalPlan] = Seq(left, right)
 }
+
+abstract class KeepOrderUnaryNode extends UnaryNode {
--- End diff --

`OrderPreservingUnaryNode`? Or perhaps do you think this would be better 
modeled as a mixin trait?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180601594
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operation if the child is already sorted
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
+&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
+  child
+  }
--- End diff --

You might not want to do it in this PR, but you could easily remove another 
simple kind of redundant sort, e.g.:

`rel.orderBy('a.desc).orderBy('a.asc)`

(and I think that `orderBy` is not stable, so any two consecutive `orderBy` 
operators are redundant). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180592716
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, 
LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Repartition, Sort}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange,
+  ShuffleExchangeExec}
--- End diff --

revert this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180397806
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -522,6 +524,8 @@ case class Range(
   override def computeStats(): Statistics = {
 Statistics(sizeInBytes = LongType.defaultSize * numElements)
   }
+
+  override def outputOrdering: Seq[SortOrder] = output.map(a => 
SortOrder(a, Descending))
--- End diff --

Nice catch, thanks! I missed it! 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180389105
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, 
ORDER_BY_ORDINAL}
+
+class RemoveRedundantSortsSuite extends PlanTest {
+  override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, 
ORDER_BY_ORDINAL -> false)
+  val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
+  val analyzer = new Analyzer(catalog, conf)
--- End diff --

If we don't use ordinal number, we can remove these.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180390907
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -197,6 +198,19 @@ class PlannerSuite extends SharedSQLContext {
 assert(planned.child.isInstanceOf[CollectLimitExec])
   }
 
+  test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") {
+val query = testData.select('key, 'value).sort('key.desc).cache()
+
assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation])
+val resorted = query.sort('key.desc)
+assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => 
s}.isEmpty)
+assert(resorted.select('key).collect().map(_.getInt(0)).toSeq ==
+  (1 to 100).sorted(Ordering[Int].reverse))
+// with a different order, the sort is needed
+val sortedAsc = query.sort('key)
+assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort 
=> s}.nonEmpty)
--- End diff --

`.nonEmpty` -> `.size == 1`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180389667
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operations on already sorted data
--- End diff --

how about `Removes Sort operation if the child is already sorted`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180390730
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -197,6 +198,19 @@ class PlannerSuite extends SharedSQLContext {
 assert(planned.child.isInstanceOf[CollectLimitExec])
   }
 
+  test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") {
+val query = testData.select('key, 'value).sort('key.desc).cache()
+
assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation])
+val resorted = query.sort('key.desc)
+assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => 
s}.isEmpty)
+assert(resorted.select('key).collect().map(_.getInt(0)).toSeq ==
+  (1 to 100).sorted(Ordering[Int].reverse))
--- End diff --

`(1 to 100).reverse`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180390139
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -522,6 +524,8 @@ case class Range(
   override def computeStats(): Statistics = {
 Statistics(sizeInBytes = LongType.defaultSize * numElements)
   }
+
+  override def outputOrdering: Seq[SortOrder] = output.map(a => 
SortOrder(a, Descending))
--- End diff --

ordering is the same when `step` in `Range` is positive or negative?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r180389285
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, 
ORDER_BY_ORDINAL}
+
+class RemoveRedundantSortsSuite extends PlanTest {
+  override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, 
ORDER_BY_ORDINAL -> false)
+  val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
+  val analyzer = new Analyzer(catalog, conf)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Remove Redundant Sorts", Once,
+RemoveRedundantSorts) ::
+  Batch("Collapse Project", Once,
+CollapseProject) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+  test("remove redundant order by") {
+val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 
'b.desc_nullsFirst)
+val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 
'b.desc_nullsFirst)
+val optimized = 
Optimize.execute(analyzer.execute(unnecessaryReordered))
--- End diff --

just use `unnecessaryReordered.analyze`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r179120037
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -43,11 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends 
UnaryNode {
  * This node is inserted at the top of a subquery when it is optimized. 
This makes sure we can
  * recognize a subquery as such, and it allows us to write subquery aware 
transformations.
  */
-case class Subquery(child: LogicalPlan) extends UnaryNode {
+case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode {
   override def output: Seq[Attribute] = child.output
 }
 
-case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) 
extends UnaryNode {
+case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
--- End diff --

sorry, I don't fully understand what you mean. In 
`ProjectExec.outputOrdering` we are getting the `child.outputOrdering` exactly 
as it is done here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r179044224
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -867,6 +871,11 @@ case class RepartitionByExpression(
 
   override def maxRows: Option[Long] = child.maxRows
   override def shuffle: Boolean = true
+
+  override def outputOrdering: Seq[SortOrder] = partitioning match {
+case RangePartitioning(ordering, _) => ordering
--- End diff --

`RangePartitioning` doesn't guarantee ordering inside partition, we can't 
do this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r179043920
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -43,11 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends 
UnaryNode {
  * This node is inserted at the top of a subquery when it is optimized. 
This makes sure we can
  * recognize a subquery as such, and it allows us to write subquery aware 
transformations.
  */
-case class Subquery(child: LogicalPlan) extends UnaryNode {
+case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode {
   override def output: Seq[Attribute] = child.output
 }
 
-case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) 
extends UnaryNode {
+case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
--- End diff --

Like `ProjectExec.outputOrdering`, we can propagate ordering for aliased 
attributes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r179043789
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operations on already sorted data
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
+&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
--- End diff --

ah they are different. This is global ordering,


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-04-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r179043661
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operations on already sorted data
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.outputOrdering.nonEmpty
+&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
--- End diff --

shall we do it after planning as we already have `SparkPlan. 
outputOrdering`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-03-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r178327617
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 ---
@@ -219,6 +219,11 @@ abstract class LogicalPlan
* Refreshes (or invalidates) any metadata/data cached in the plan 
recursively.
*/
   def refresh(): Unit = children.foreach(_.refresh())
+
+  /**
+   * If the current plan contains sorted data, it contains the sorted 
order.
--- End diff --

Returns the output ordering that this plan generates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-03-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r178327166
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 ---
@@ -219,6 +219,11 @@ abstract class LogicalPlan
* Refreshes (or invalidates) any metadata/data cached in the plan 
recursively.
*/
   def refresh(): Unit = children.foreach(_.refresh())
+
+  /**
+   * If the current plan contains sorted data, it contains the sorted 
order.
+   */
+  def sortedOrder: Seq[SortOrder] = Nil
--- End diff --

`def outputOrdering`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-03-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20560#discussion_r178328886
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes Sort operations on already sorted data
+ */
+object RemoveRedundantSorts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case Sort(orders, true, child) if child.sortedOrder.nonEmpty
+&& child.sortedOrder.zip(orders).forall { case (s1, s2) => 
s1.satisfies(s2) } =>
--- End diff --

Why not using `SortOrder.orderingSatisfies`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...

2018-02-09 Thread mgaido91
GitHub user mgaido91 opened a pull request:

https://github.com/apache/spark/pull/20560

[SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer

## What changes were proposed in this pull request?

Added a new rule to remove Sort operation when its child is already sorted.
For instance, this simple code:
```
spark.sparkContext.parallelize(Seq(("a", "b"))).toDF("a", 
"b").registerTempTable("table1")
val df = sql(s"""SELECT b
| FROM (
| SELECT a, b
| FROM table1
| ORDER BY a
| ) t
| ORDER BY a""".stripMargin)
df.explain(true)
```
before the PR produces this plan:
```
== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
  +- 'Sort ['a ASC NULLS FIRST], true
 +- 'Project ['a, 'b]
+- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
  +- SubqueryAlias t
 +- Sort [a#6 ASC NULLS FIRST], true
+- Project [a#6, b#7]
   +- SubqueryAlias table1
  +- Project [_1#3 AS a#6, _2#4 AS b#7]
 +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS 
_1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, 
true, false) AS _2#4]
+- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
  +- Sort [a#6 ASC NULLS FIRST], true
 +- Project [_1#3 AS a#6, _2#4 AS b#7]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS 
_2#4]
   +- ExternalRDD [obj#2]

== Physical Plan ==
*(3) Project [b#7]
+- *(3) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
  +- *(2) Project [b#7, a#6]
 +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
   +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
  +- *(1) SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS 
_2#4]
 +- Scan ExternalRDDScan[obj#2]
```

while after the PR produces:

```
== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
  +- 'Sort ['a ASC NULLS FIRST], true
 +- 'Project ['a, 'b]
+- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
  +- SubqueryAlias t
 +- Sort [a#6 ASC NULLS FIRST], true
+- Project [a#6, b#7]
   +- SubqueryAlias table1
  +- Project [_1#3 AS a#6, _2#4 AS b#7]
 +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS 
_1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, 
true, false) AS _2#4]
+- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [_1#3 AS a#6, _2#4 AS b#7]
  +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS 
_2#4]
 +- ExternalRDD [obj#2]

== Physical Plan ==
*(2) Project [b#7]
+- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitionin