[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218638143 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- Ok. I will do it too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218633220 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -68,22 +68,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil +case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) => + if (limit < conf.topKSortFallbackThreshold) { --- End diff -- btw here we really need to document what the strategies are. when there were only two cases it's not a big deal because it'd take a few seconds to understand. but this block is pretty large now that's difficult to understand. see join strategy documentation for example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218632551 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- thanks @viirya can you write a design doc or put it in the classdoc of limit on how we handle limits? your sequence of prs are making limits much more complicated (with optimizations) and very difficult to reason about. i think we can make it easier to reason about, if we actually document the execution strategy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218631953 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- I see. Let me submit a pr later to address those document. Really appreciate your comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218631461 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- code needs to be documented. we won't find this pr discussion a year from now by looking at the source code, trying to figure out what it means. also the doc needs to be readable. the current doc for the config flag is unfortunately unparsable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218631052 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- Ok, I see. I will document it in the pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218630599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- please document it in code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218630241 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- When limit number is more than `SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD`, the planner won't choose `TakeOrderedAndProjectExec` to perform the sort + limit operation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218629650 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- what do you mean by "it's not goes for TakeOrderedAndProjectExec"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218627396 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- It means this global limit won't change input data order. This is used on sort + limit case which is usually taken by `TakeOrderedAndProjectExec` at most of time. But if limit number is more than `SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD`, it's not goes for `TakeOrderedAndProjectExec`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r218623478 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -98,7 +98,8 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode wi /** * Take the `limit` elements of the child output. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { +case class GlobalLimitExec(limit: Int, child: SparkPlan, + orderedLimit: Boolean = false) extends UnaryExecNode { --- End diff -- what does orderedLimit mean here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r217128163 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -68,22 +68,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil +case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) => + if (limit < conf.topKSortFallbackThreshold) { --- End diff -- Also, please add `space` in-between s and `@`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r217088628 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -68,22 +68,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil +case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) => + if (limit < conf.topKSortFallbackThreshold) { --- End diff -- @hvanhovell OK. I will create a follow-up PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r217070658 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -68,22 +68,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil +case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) => + if (limit < conf.topKSortFallbackThreshold) { --- End diff -- @viirya sorry to be a little late to the party. This pattern is repeated 4x can you just most into a helper function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22344 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/22344 [SPARK-25352][SQL] Perform ordered global limit when limit number is bigger than topKSortFallbackThreshold ## What changes were proposed in this pull request? We have optimization on global limit to evenly distribute limit rows across all partitions. This optimization doesn't work for ordered results. For a query ending with sort + limit, in most cases it is performed by `TakeOrderedAndProjectExec`. But if limit number is bigger than `SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD`, global limit will be used. At this moment, we need to do ordered global limit. ## How was this patch tested? Unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-25352 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22344.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22344 commit 8d49c1afdbd6c0219d6cc182e53311201f73489f Author: Liang-Chi Hsieh Date: 2018-09-06T04:43:15Z Do ordered global limit when limit number is bigger than topKSortFallbackThreshold. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org