[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-16 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r524888701



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,27 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
+// We skip such case that Sort is after Limit since
+// SparkStrategies will convert them to TakeOrderedAndProjectExec
+val skipEliminate = child match {
+  case Sort(_, true, _) => true

Review comment:
   I'm ok to skip eliminate this case if we afraid the potential 
regression. But the benchmark show it's no harmful, I also believe that, for 
large data size range shuffle sort is faster than singe partition sort.
   
   And to clarify, we can choose the follow option
   1. skip eliminate limit if child is sort, to avoid potential regression 
about `TakeOrderedAndProjectExec`.
   2. eliminate all matched limit, since benchmark support it.
   3. add a config to control the eliminate limit ? e.g. 
`spark.sql.optimizer.eliminateLimits.enable`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-16 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r524803284



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,27 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
+// We skip such case that Sort is after Limit since
+// SparkStrategies will convert them to TakeOrderedAndProjectExec
+val skipEliminate = child match {
+  case Sort(_, true, _) => true

Review comment:
   Logically, you are right, but it seems something wrong with this 
   `spark.range(10).toDF("c").sort("c").explain(true)`
   ```
   == Optimized Logical Plan ==
   Sort [c#18L ASC NULLS FIRST], true
   +- Project [id#16L AS c#18L]
  +- Range (0, 10, step=1, splits=Some(2))
   
   == Physical Plan ==
   *(2) Sort [c#18L ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(c#18L ASC NULLS FIRST, 200), true, [id=#56]
  +- *(1) Project [id#16L AS c#18L]
 +- *(1) Range (0, 10, step=1, splits=2)
   ```
   
   and this case is what you said
   `spark.range(10).sort("id").explain(true)`
   ```
   == Optimized Logical Plan ==
   Range (0, 10, step=1, splits=Some(2))
   
   == Physical Plan ==
   *(1) Range (0, 10, step=1, splits=2)
   ```
   
   Seems this issue is [#30300](https://github.com/apache/spark/pull/30300).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-16 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r524804089



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,27 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
+// We skip such case that Sort is after Limit since
+// SparkStrategies will convert them to TakeOrderedAndProjectExec
+val skipEliminate = child match {
+  case Sort(_, true, _) => true

Review comment:
   BTW, also add the shuffle benchmark
   ```
   runBenchmark("Sort and Limit") {
 val N = 10
 val benchmark = new Benchmark("benchmark sort and limit", N)
   
 benchmark.addCase("TakeOrderedAndProject", 3) { _ =>
   spark.range(N).toDF("c").repartition(200).sort("c").take(20)
 }
   
 benchmark.addCase("Sort And Limit", 3) { _ =>
   withSQLConf("spark.sql.execution.topKSortFallbackThreshold" -> "-1") 
{
 spark.range(N).toDF("c").repartition(200).sort("c").take(20)
   }
 }
   
 benchmark.addCase("Sort", 3) { _ =>
   spark.range(N).toDF("c").repartition(200).sort("c").collect()
 }
 benchmark.run()
   }
   ```
   
   and the result is 
   ```
   Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.15.6
   Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
   benchmark sort and limit: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   

   TakeOrderedAndProject  1833   2259   
  382  0.1   18327.1   1.0X
   Sort And Limit 1417   1658   
  285  0.1   14167.5   1.3X
   Sort   1324   1484   
  225  0.1   13238.3   1.4X
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-16 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r524803284



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,27 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
+// We skip such case that Sort is after Limit since
+// SparkStrategies will convert them to TakeOrderedAndProjectExec
+val skipEliminate = child match {
+  case Sort(_, true, _) => true

Review comment:
   Logically, you are right, but it seems something wrong with this 
   `spark.range(10).toDF("c").sort("c").explain(true)`
   ```
   == Optimized Logical Plan ==
   Sort [c#18L ASC NULLS FIRST], true
   +- Project [id#16L AS c#18L]
  +- Range (0, 10, step=1, splits=Some(2))
   
   == Physical Plan ==
   *(2) Sort [c#18L ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(c#18L ASC NULLS FIRST, 200), true, [id=#56]
  +- *(1) Project [id#16L AS c#18L]
 +- *(1) Range (0, 10, step=1, splits=2)
   ```
   
   and this case is what you said
   `spark.range(10).sort("id").explain(true)`
   ```
   == Optimized Logical Plan ==
   Range (0, 10, step=1, splits=Some(2))
   
   == Physical Plan ==
   *(1) Range (0, 10, step=1, splits=2)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-16 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r524038654



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,27 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
+// We skip such case that Sort is after Limit since
+// SparkStrategies will convert them to TakeOrderedAndProjectExec
+val skipEliminate = child match {
+  case Sort(_, true, _) => true

Review comment:
   Refer to this result, we can remove the `Limit` even if it's child is 
`Sort`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-16 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r524034982



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,27 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
+// We skip such case that Sort is after Limit since
+// SparkStrategies will convert them to TakeOrderedAndProjectExec
+val skipEliminate = child match {
+  case Sort(_, true, _) => true

Review comment:
   It's a good question. I make a simple benchmark for this case.
   
   ```
   runBenchmark("Sort and Limit") {
 val N = 1
 val benchmark = new Benchmark("benchmark sort and limit", N)
   
 benchmark.addCase("TakeOrderedAndProject", 3) { _ =>
   spark.range(N).toDF("c").sort("c").take(2)
 }
   
 benchmark.addCase("Sort And Limit", 3) { _ =>
   withSQLConf("spark.sql.execution.topKSortFallbackThreshold" -> "-1") 
{
 spark.range(N).toDF("c").sort("c").take(2)
   }
 }
   
 benchmark.addCase("Sort", 3) { _ =>
   spark.range(N).toDF("c").sort("c").collect()
 }
 benchmark.run()
   }
   ```
   
   And the result is:
   ```
   

   Sort and Limit
   

   
   Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.15.6
   Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
   benchmark sort and limit: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   

   TakeOrderedAndProject   181210   
   29  0.1   18148.6   1.0X
   Sort And Limit   90109   
   16  0.18996.1   2.0X
   Sort 57 63   
9  0.25678.5   3.2X
   
   ```
   
   Seems `TakeOrderedAndProject` have the redundant loop in this case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-15 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r523851039



##
File path: 
sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92/explain.txt
##
@@ -1,5 +1,5 @@
 == Physical Plan ==
-TakeOrderedAndProject (34)
+* Sort (34)

Review comment:
   Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-14 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r523666452



##
File path: 
sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92/explain.txt
##
@@ -1,5 +1,5 @@
 == Physical Plan ==
-TakeOrderedAndProject (34)
+* Sort (34)

Review comment:
   Other thought, we can add this pattern
   ```
   case sort @ Sort(order, true, child)
 if sort.maxRow < conf.topKSortFallbackThreshold =>
   TakeOrderedAndProjectExec()
   ```
   
   In this way, we can infer the `TakeOrderedAndProjectExec` from `Sort` which 
has not `Limit` after.
   
   What do you think about this? @maropu @viirya @cloud-fan 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-14 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r523666452



##
File path: 
sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92/explain.txt
##
@@ -1,5 +1,5 @@
 == Physical Plan ==
-TakeOrderedAndProject (34)
+* Sort (34)

Review comment:
   Other thought, we can add this pattern
   ```
   case sort @ Sort(order, true, child)
 if sort.maxRow < conf.topKSortFallbackThreshold =>
   TakeOrderedAndProjectExec()
   ```
   
   In this way, we can infer the `TakeOrderedAndProjectExec` from `Sort` which 
has not `Limit` after.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-14 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r523656874



##
File path: 
sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92/explain.txt
##
@@ -1,5 +1,5 @@
 == Physical Plan ==
-TakeOrderedAndProject (34)
+* Sort (34)

Review comment:
   This q92 sql:
   ```
   SELECT sum(ws_ext_discount_amt) AS `Excess Discount Amount `
   FROM web_sales, item, date_dim
   WHERE i_manufact_id = 350
 AND i_item_sk = ws_item_sk
 AND d_date BETWEEN '2000-01-27' AND (cast('2000-01-27' AS DATE) + INTERVAL 
90 days)
 AND d_date_sk = ws_sold_date_sk
 AND ws_ext_discount_amt >
 (
   SELECT 1.3 * avg(ws_ext_discount_amt)
   FROM web_sales, date_dim
   WHERE ws_item_sk = i_item_sk
 AND d_date BETWEEN '2000-01-27' AND (cast('2000-01-27' AS DATE) + 
INTERVAL 90 days)
 AND d_date_sk = ws_sold_date_sk
 )
   ORDER BY sum(ws_ext_discount_amt)
   LIMIT 100
   ```
   
   yes, `Limit` after `Sort` is a special case, we will convert to 
`TakeOrderedAndProject`, but it seems not necessary to do both `sort` and 
`limit` if child maxRow == 1. Maybe we can do an another check seems like `if 
sort.child.maxRow <= 1 then remove sort` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-14 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r523596769



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,21 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, childMaxRow: Option[Long]): 
Boolean = {
+limitExpr.foldable && childMaxRow.exists { _ <= 
limitExpr.eval().toString.toLong }

Review comment:
   ah, thanks for the night review.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-14 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r523422082



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,21 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, childMaxRow: Option[Long]): 
Boolean = {
+limitExpr.foldable && childMaxRow.exists { _ <= 
limitExpr.eval().toString.toLong }

Review comment:
   `childMaxRow ` is an Option value, is there any issue with it ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-13 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r522975396



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,23 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, childMaxRow: Option[Long]): 
Boolean = {
+limitExpr.foldable &&
+  childMaxRow.isDefined &&
+  childMaxRow.get <= limitExpr.eval().toString.toInt

Review comment:
   You are right, It will convert to Long implicitly, update it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-13 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r522960572



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,23 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, childMaxRow: Option[Long]): 
Boolean = {
+limitExpr.foldable &&
+  childMaxRow.isDefined &&
+  childMaxRow.get <= limitExpr.eval().toString.toInt
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {

Review comment:
   We need `transformDown` here, so better to use `transformDown` 
explicitly ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-13 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r522958258



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,23 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, childMaxRow: Option[Long]): 
Boolean = {
+limitExpr.foldable &&
+  childMaxRow.isDefined &&
+  childMaxRow.get <= limitExpr.eval().toString.toInt

Review comment:
   Looks better. `Int` is enough since we have checked the data type during 
analyzer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #30368: [SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row

2020-11-13 Thread GitBox


ulysses-you commented on a change in pull request #30368:
URL: https://github.com/apache/spark/pull/30368#discussion_r522826329



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1452,11 +1452,23 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Combines two adjacent [[Limit]] operators into one, merging the
- * expressions into one single expression.
+ * 1. Eliminate [[Limit]] operators if it's child max row <= limit.
+ * 2. Combines two adjacent [[Limit]] operators into one, merging the
+ *expressions into one single expression.
  */
-object CombineLimits extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object EliminateLimits extends Rule[LogicalPlan] {
+  private def canEliminate(limitExpr: Expression, childMaxRow: Option[Long]): 
Boolean = {
+limitExpr.foldable &&
+  childMaxRow.isDefined &&
+  childMaxRow.get <= limitExpr.eval().toString.toInt
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {

Review comment:
   We still need the old patterns for such case `plan.limit(2).limit(1)`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org