[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218638143
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

Ok. I will do it too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218633220
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -68,22 +68,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object SpecialLimits extends Strategy {
 override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
   case ReturnAnswer(rootPlan) => rootPlan match {
-case Limit(IntegerLiteral(limit), Sort(order, true, child))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) =>
+  if (limit < conf.topKSortFallbackThreshold) {
--- End diff --

btw here we really need to document what the strategies are. when there 
were only two cases it's not a big deal because it'd take a few seconds to 
understand. but this block is pretty large now that's difficult to understand. 
see join strategy documentation for example.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218632551
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

thanks @viirya 

can you write a design doc or put it in the classdoc of limit on how we 
handle limits? your sequence of prs are making limits much more complicated 
(with optimizations) and very difficult to reason about. i think we can make it 
easier to reason about, if we actually document the execution strategy.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218631953
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

I see. Let me submit a pr later to address those document. Really 
appreciate your comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218631461
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

code needs to be documented. we won't find this pr discussion a year from 
now by looking at the source code, trying to figure out what it means. also the 
doc needs to be readable. the current doc for the config flag is unfortunately 
unparsable.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218631052
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

Ok, I see. I will document it in the pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218630599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

please document it in code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218630241
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

When limit number is more than `SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD`, the 
planner won't choose `TakeOrderedAndProjectExec` to perform the sort + limit 
operation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218629650
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

what do you mean by "it's not goes for TakeOrderedAndProjectExec"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218627396
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

It means this global limit won't change input data order. This is used on 
sort + limit case which is usually taken by `TakeOrderedAndProjectExec` at most 
of time.

But if limit number is more than `SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD`, 
it's not goes for `TakeOrderedAndProjectExec`.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r218623478
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode wi
 /**
  * Take the `limit` elements of the child output.
  */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class GlobalLimitExec(limit: Int, child: SparkPlan,
+   orderedLimit: Boolean = false) extends 
UnaryExecNode {
--- End diff --

what does orderedLimit mean here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-12 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r217128163
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -68,22 +68,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object SpecialLimits extends Strategy {
 override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
   case ReturnAnswer(rootPlan) => rootPlan match {
-case Limit(IntegerLiteral(limit), Sort(order, true, child))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) =>
+  if (limit < conf.topKSortFallbackThreshold) {
--- End diff --

Also, please add `space` in-between s and `@`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r217088628
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -68,22 +68,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object SpecialLimits extends Strategy {
 override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
   case ReturnAnswer(rootPlan) => rootPlan match {
-case Limit(IntegerLiteral(limit), Sort(order, true, child))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) =>
+  if (limit < conf.topKSortFallbackThreshold) {
--- End diff --

@hvanhovell OK. I will create a follow-up PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-12 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r217070658
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -68,22 +68,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object SpecialLimits extends Strategy {
 override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
   case ReturnAnswer(rootPlan) => rootPlan match {
-case Limit(IntegerLiteral(limit), Sort(order, true, child))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) =>
+  if (limit < conf.topKSortFallbackThreshold) {
--- End diff --

@viirya sorry to be a little late to the party. This pattern is repeated 4x 
can you just most into a helper function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22344


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-05 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/22344

[SPARK-25352][SQL] Perform ordered global limit when limit number is bigger 
than topKSortFallbackThreshold

## What changes were proposed in this pull request?

We have optimization on global limit to evenly distribute limit rows across 
all partitions. This optimization doesn't work for ordered results.

For a query ending with sort + limit, in most cases it is performed by 
`TakeOrderedAndProjectExec`.

But if limit number is bigger than `SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD`, 
global limit will be used. At this moment, we need to do ordered global limit.

## How was this patch tested?

Unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-25352

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22344


commit 8d49c1afdbd6c0219d6cc182e53311201f73489f
Author: Liang-Chi Hsieh 
Date:   2018-09-06T04:43:15Z

Do ordered global limit when limit number is bigger than 
topKSortFallbackThreshold.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org