dtenedor commented on code in PR #35975:
URL: https://github.com/apache/spark/pull/35975#discussion_r848958854
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala:
##########
@@ -182,6 +190,48 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan)
extends BaseLimitExec {
copy(child = newChild)
}
+/**
+ * Skip the first `offset` elements then take the first `limit` of the
following elements in
+ * the child's single output partition.
+ */
+case class GlobalLimitAndOffsetExec(
+ limit: Int,
+ offset: Int,
+ child: SparkPlan) extends BaseLimitExec {
+
+ override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
+ override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ override def doExecute(): RDD[InternalRow] = {
+ val rdd = child.execute().mapPartitions { iter => iter.take(limit +
offset)}
+ rdd.zipWithIndex().filter(_._2 >= offset).map(_._1)
Review Comment:
could we accomplish this in a simpler way by just doing a `drop` by `offset`
on the rows instead?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -85,22 +85,43 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
case ReturnAnswer(rootPlan) => rootPlan match {
case Limit(IntegerLiteral(limit), Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
- TakeOrderedAndProjectExec(limit, order, child.output,
planLater(child)) :: Nil
+ TakeOrderedAndProjectExec(limit, 0, order, child.output,
planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order,
true, child)))
if limit < conf.topKSortFallbackThreshold =>
- TakeOrderedAndProjectExec(limit, order, projectList,
planLater(child)) :: Nil
+ TakeOrderedAndProjectExec(limit, 0, order, projectList,
planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), child) =>
- CollectLimitExec(limit, planLater(child)) :: Nil
+ CollectLimitExec(limit, 0, planLater(child)) :: Nil
+ case GlobalLimitAndOffset(IntegerLiteral(limit),
IntegerLiteral(offset),
+ Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold
=>
+ TakeOrderedAndProjectExec(limit, offset, order, child.output,
planLater(child)) :: Nil
+ case GlobalLimitAndOffset(IntegerLiteral(limit),
IntegerLiteral(offset),
+ Project(projectList, Sort(order, true, child)))
+ if limit < conf.topKSortFallbackThreshold =>
+ TakeOrderedAndProjectExec(limit, offset, order, projectList,
planLater(child)) :: Nil
+ case GlobalLimitAndOffset(IntegerLiteral(limit),
IntegerLiteral(offset), child) =>
+ CollectLimitExec(limit, offset, planLater(child)) :: Nil
case Tail(IntegerLiteral(limit), child) =>
CollectTailExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case Limit(IntegerLiteral(limit), Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
- TakeOrderedAndProjectExec(limit, order, child.output,
planLater(child)) :: Nil
+ TakeOrderedAndProjectExec(limit, 0, order, child.output,
planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true,
child)))
if limit < conf.topKSortFallbackThreshold =>
- TakeOrderedAndProjectExec(limit, order, projectList, planLater(child))
:: Nil
+ TakeOrderedAndProjectExec(limit, 0, order, projectList,
planLater(child)) :: Nil
+ case GlobalLimitAndOffset(
+ IntegerLiteral(limit),
+ IntegerLiteral(offset),
+ Sort(order, true, child))
+ if limit < conf.topKSortFallbackThreshold =>
Review Comment:
should this be `limit + offset < conf.topKSortFallbackThreshold`? Same below?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -407,10 +407,31 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit",
limitExpr)
- case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit",
limitExpr)
+// case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit",
limitExpr)
+ case LocalLimit(limitExpr, child) =>
+ checkLimitLikeClause("limit", limitExpr)
+ child match {
+ case Offset(offsetExpr, _) =>
+ val limit = limitExpr.eval().asInstanceOf[Int]
Review Comment:
SG. Will the `checkLimitLikeClause` execute before this `asInstanceOf[Int]`
call here? If so, we are OK. Otherwise, we would receive an exception here,
which might result in an confusing error message?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -1145,6 +1145,23 @@ case class Expand(
copy(child = newChild)
}
+/**
+ * A logical offset, which may removing a specified number of rows from the
beginning of the
+ * output of child logical plan.
Review Comment:
SG, the logical Offset just removes the first N rows. When we combine it
with a Limit in the physical plan, then we can think about these semantics.
##########
sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out:
##########
@@ -38,6 +38,62 @@ struct<two:string,unique1:int,unique2:int,stringu1:string>
62 633 KCAAAA
+-- !query
+SELECT '' AS three, unique1, unique2, stringu1
+ FROM onek WHERE unique1 > 100
+ ORDER BY unique1 LIMIT 3 OFFSET 20
+-- !query schema
+struct<three:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+ 121 700 REAAAA
+ 122 519 SEAAAA
+ 123 777 TEAAAA
+
+
+-- !query
+SELECT '' AS zero, unique1, unique2, stringu1
+ FROM onek WHERE unique1 < 50
+ ORDER BY unique1 DESC LIMIT 8 OFFSET 99
+-- !query schema
+struct<zero:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+
+
+
+-- !query
+SELECT '' AS eleven, unique1, unique2, stringu1
+ FROM onek WHERE unique1 < 50
+ ORDER BY unique1 DESC LIMIT 20 OFFSET 39
Review Comment:
Could we have a test case with a LIMIT + OFFSET following each of the major
operators, e.g. aggregation, join, union all?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1846,6 +1847,19 @@ object EliminateLimits extends Rule[LogicalPlan] {
}
}
+/**
+ * Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]]
operators into one,
+ * merging the expressions into one single expression.
+ */
+object RewriteOffsets extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case GlobalLimit(le, Offset(oe, grandChild)) =>
Review Comment:
I checked, it looks like the `object Limit` contains the background info.
Maybe just write a comment like: `// See [[Limit]] for more information about
the difference between LocalLimit and GlobalLimit.`
<img width="801" alt="image"
src="https://user-images.githubusercontent.com/99207096/163072607-e9057a74-3e47-48ab-a0f3-bf2fc61919d4.png">
##########
sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out:
##########
@@ -38,6 +38,62 @@ struct<two:string,unique1:int,unique2:int,stringu1:string>
62 633 KCAAAA
+-- !query
+SELECT '' AS three, unique1, unique2, stringu1
+ FROM onek WHERE unique1 > 100
+ ORDER BY unique1 LIMIT 3 OFFSET 20
+-- !query schema
+struct<three:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+ 121 700 REAAAA
+ 122 519 SEAAAA
+ 123 777 TEAAAA
+
+
+-- !query
+SELECT '' AS zero, unique1, unique2, stringu1
+ FROM onek WHERE unique1 < 50
+ ORDER BY unique1 DESC LIMIT 8 OFFSET 99
Review Comment:
could we add a few test cases with the LIMIT and OFFSET inside subqueries?
Do the rows get filtered out at the table subquery boundary and then the rows
from the OFFSET are not consumed by the remaining logic?
##########
sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql:
##########
@@ -12,25 +12,24 @@ SELECT '' AS five, unique1, unique2, stringu1
SELECT '' AS two, unique1, unique2, stringu1
FROM onek WHERE unique1 > 60 AND unique1 < 63
ORDER BY unique1 LIMIT 5;
--- [SPARK-28330] ANSI SQL: Top-level <result offset clause> in <query
expression>
--- SELECT '' AS three, unique1, unique2, stringu1
--- FROM onek WHERE unique1 > 100
--- ORDER BY unique1 LIMIT 3 OFFSET 20;
--- SELECT '' AS zero, unique1, unique2, stringu1
--- FROM onek WHERE unique1 < 50
--- ORDER BY unique1 DESC LIMIT 8 OFFSET 99;
--- SELECT '' AS eleven, unique1, unique2, stringu1
--- FROM onek WHERE unique1 < 50
--- ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
+SELECT '' AS three, unique1, unique2, stringu1
+ FROM onek WHERE unique1 > 100
+ ORDER BY unique1 LIMIT 3 OFFSET 20;
+ SELECT '' AS zero, unique1, unique2, stringu1
+ FROM onek WHERE unique1 < 50
+ ORDER BY unique1 DESC LIMIT 8 OFFSET 99;
+ SELECT '' AS eleven, unique1, unique2, stringu1
+ FROM onek WHERE unique1 < 50
+ ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
-- SELECT '' AS ten, unique1, unique2, stringu1
Review Comment:
SG.
##########
sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out:
##########
@@ -38,6 +38,62 @@ struct<two:string,unique1:int,unique2:int,stringu1:string>
62 633 KCAAAA
+-- !query
+SELECT '' AS three, unique1, unique2, stringu1
+ FROM onek WHERE unique1 > 100
+ ORDER BY unique1 LIMIT 3 OFFSET 20
+-- !query schema
+struct<three:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+ 121 700 REAAAA
+ 122 519 SEAAAA
+ 123 777 TEAAAA
+
+
+-- !query
+SELECT '' AS zero, unique1, unique2, stringu1
+ FROM onek WHERE unique1 < 50
+ ORDER BY unique1 DESC LIMIT 8 OFFSET 99
+-- !query schema
+struct<zero:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+
+
+
+-- !query
+SELECT '' AS eleven, unique1, unique2, stringu1
+ FROM onek WHERE unique1 < 50
+ ORDER BY unique1 DESC LIMIT 20 OFFSET 39
+-- !query schema
+struct<eleven:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+ 10 520 KAAAAA
+ 9 49 JAAAAA
+ 8 653 IAAAAA
+ 7 647 HAAAAA
+ 6 978 GAAAAA
+ 5 541 FAAAAA
+ 4 833 EAAAAA
+ 3 431 DAAAAA
+ 2 326 CAAAAA
+ 1 214 BAAAAA
+ 0 998 AAAAAA
+
+
+-- !query
+SELECT '' AS five, unique1, unique2, stringu1
+ FROM onek
+ ORDER BY unique1 LIMIT 5 OFFSET 900
Review Comment:
do we have a test case with OFFSET 0?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -85,22 +85,43 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
case ReturnAnswer(rootPlan) => rootPlan match {
case Limit(IntegerLiteral(limit), Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
- TakeOrderedAndProjectExec(limit, order, child.output,
planLater(child)) :: Nil
+ TakeOrderedAndProjectExec(limit, 0, order, child.output,
planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order,
true, child)))
if limit < conf.topKSortFallbackThreshold =>
- TakeOrderedAndProjectExec(limit, order, projectList,
planLater(child)) :: Nil
+ TakeOrderedAndProjectExec(limit, 0, order, projectList,
planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), child) =>
- CollectLimitExec(limit, planLater(child)) :: Nil
+ CollectLimitExec(limit, 0, planLater(child)) :: Nil
+ case GlobalLimitAndOffset(IntegerLiteral(limit),
IntegerLiteral(offset),
+ Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold
=>
+ TakeOrderedAndProjectExec(limit, offset, order, child.output,
planLater(child)) :: Nil
+ case GlobalLimitAndOffset(IntegerLiteral(limit),
IntegerLiteral(offset),
+ Project(projectList, Sort(order, true, child)))
+ if limit < conf.topKSortFallbackThreshold =>
+ TakeOrderedAndProjectExec(limit, offset, order, projectList,
planLater(child)) :: Nil
+ case GlobalLimitAndOffset(IntegerLiteral(limit),
IntegerLiteral(offset), child) =>
+ CollectLimitExec(limit, offset, planLater(child)) :: Nil
case Tail(IntegerLiteral(limit), child) =>
CollectTailExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case Limit(IntegerLiteral(limit), Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
- TakeOrderedAndProjectExec(limit, order, child.output,
planLater(child)) :: Nil
+ TakeOrderedAndProjectExec(limit, 0, order, child.output,
planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true,
child)))
if limit < conf.topKSortFallbackThreshold =>
- TakeOrderedAndProjectExec(limit, order, projectList, planLater(child))
:: Nil
+ TakeOrderedAndProjectExec(limit, 0, order, projectList,
planLater(child)) :: Nil
+ case GlobalLimitAndOffset(
Review Comment:
Sure, something like the comment on L203 starting with `// If it is an
equi-join, we first look at the join hints w.r.t. the following order...`
We could consider something like the following, for each `case` in this list:
```
// This is a global LIMIT and OFFSET over a logical sorting operator,
// where the specified limit is less than a heuristic threshold.
// In this case we generate a physical top-K sorting operator,
passing down
// the limit and offset values to be evaluated inline during the
physical
// sorting operation for greater efficiency.
case GlobalLimitAndOffset(IntegerLiteral(limit),
IntegerLiteral(offset),
Sort(order, true, child)) if limit <
conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, child.output,
planLater(child)) :: Nil
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]