dtenedor commented on code in PR #35975:
URL: https://github.com/apache/spark/pull/35975#discussion_r848958854


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala:
##########
@@ -182,6 +190,48 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) 
extends BaseLimitExec {
     copy(child = newChild)
 }
 
+/**
+ * Skip the first `offset` elements then take the first `limit` of the 
following elements in
+ * the child's single output partition.
+ */
+case class GlobalLimitAndOffsetExec(
+    limit: Int,
+    offset: Int,
+    child: SparkPlan) extends BaseLimitExec {
+
+  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def doExecute(): RDD[InternalRow] = {
+    val rdd = child.execute().mapPartitions { iter => iter.take(limit + 
offset)}
+    rdd.zipWithIndex().filter(_._2 >= offset).map(_._1)

Review Comment:
   could we accomplish this in a simpler way by just doing a `drop` by `offset` 
on the rows instead?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -85,22 +85,43 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case ReturnAnswer(rootPlan) => rootPlan match {
         case Limit(IntegerLiteral(limit), Sort(order, true, child))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), child) =>
-          CollectLimitExec(limit, planLater(child)) :: Nil
+          CollectLimitExec(limit, 0, planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold 
=>
+          TakeOrderedAndProjectExec(limit, offset, order, child.output, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Project(projectList, Sort(order, true, child)))
+            if limit < conf.topKSortFallbackThreshold =>
+          TakeOrderedAndProjectExec(limit, offset, order, projectList, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset), child) =>
+          CollectLimitExec(limit, offset, planLater(child)) :: Nil
         case Tail(IntegerLiteral(limit), child) =>
           CollectTailExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
       case Limit(IntegerLiteral(limit), Sort(order, true, child))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
       case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, 
child)))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) 
:: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
+      case GlobalLimitAndOffset(
+          IntegerLiteral(limit),
+          IntegerLiteral(offset),
+          Sort(order, true, child))
+          if limit < conf.topKSortFallbackThreshold =>

Review Comment:
   should this be `limit + offset < conf.topKSortFallbackThreshold`? Same below?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -407,10 +407,31 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 
           case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
 
-          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+//          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+          case LocalLimit(limitExpr, child) =>
+            checkLimitLikeClause("limit", limitExpr)
+            child match {
+              case Offset(offsetExpr, _) =>
+                val limit = limitExpr.eval().asInstanceOf[Int]

Review Comment:
   SG. Will the `checkLimitLikeClause` execute before this `asInstanceOf[Int]` 
call here? If so, we are OK. Otherwise, we would receive an exception here, 
which might result in an confusing error message?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -1145,6 +1145,23 @@ case class Expand(
     copy(child = newChild)
 }
 
+/**
+ * A logical offset, which may removing a specified number of rows from the 
beginning of the
+ * output of child logical plan.

Review Comment:
   SG, the logical Offset just removes the first N rows. When we combine it 
with a Limit in the physical plan, then we can think about these semantics.



##########
sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out:
##########
@@ -38,6 +38,62 @@ struct<two:string,unique1:int,unique2:int,stringu1:string>
        62      633     KCAAAA
 
 
+-- !query
+SELECT '' AS three, unique1, unique2, stringu1
+               FROM onek WHERE unique1 > 100
+               ORDER BY unique1 LIMIT 3 OFFSET 20
+-- !query schema
+struct<three:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+       121     700     REAAAA
+       122     519     SEAAAA
+       123     777     TEAAAA
+
+
+-- !query
+SELECT '' AS zero, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 8 OFFSET 99
+-- !query schema
+struct<zero:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+
+
+
+-- !query
+SELECT '' AS eleven, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 20 OFFSET 39

Review Comment:
   Could we have a test case with a LIMIT + OFFSET following each of the major 
operators, e.g. aggregation, join, union all? 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1846,6 +1847,19 @@ object EliminateLimits extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] 
operators into one,
+ *  merging the expressions into one single expression.
+ */
+object RewriteOffsets extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case GlobalLimit(le, Offset(oe, grandChild)) =>

Review Comment:
   I checked, it looks like the `object Limit` contains the background info. 
Maybe just write a comment like: `// See [[Limit]] for more information about 
the difference between LocalLimit and GlobalLimit.`
   
   <img width="801" alt="image" 
src="https://user-images.githubusercontent.com/99207096/163072607-e9057a74-3e47-48ab-a0f3-bf2fc61919d4.png";>
   



##########
sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out:
##########
@@ -38,6 +38,62 @@ struct<two:string,unique1:int,unique2:int,stringu1:string>
        62      633     KCAAAA
 
 
+-- !query
+SELECT '' AS three, unique1, unique2, stringu1
+               FROM onek WHERE unique1 > 100
+               ORDER BY unique1 LIMIT 3 OFFSET 20
+-- !query schema
+struct<three:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+       121     700     REAAAA
+       122     519     SEAAAA
+       123     777     TEAAAA
+
+
+-- !query
+SELECT '' AS zero, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 8 OFFSET 99

Review Comment:
   could we add a few test cases with the LIMIT and OFFSET inside subqueries? 
Do the rows get filtered out at the table subquery boundary and then the rows 
from the OFFSET are not consumed by the remaining logic?



##########
sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql:
##########
@@ -12,25 +12,24 @@ SELECT '' AS five, unique1, unique2, stringu1
 SELECT '' AS two, unique1, unique2, stringu1
                FROM onek WHERE unique1 > 60 AND unique1 < 63
                ORDER BY unique1 LIMIT 5;
--- [SPARK-28330] ANSI SQL: Top-level <result offset clause> in <query 
expression>
--- SELECT '' AS three, unique1, unique2, stringu1
---             FROM onek WHERE unique1 > 100
---             ORDER BY unique1 LIMIT 3 OFFSET 20;
--- SELECT '' AS zero, unique1, unique2, stringu1
---             FROM onek WHERE unique1 < 50
---             ORDER BY unique1 DESC LIMIT 8 OFFSET 99;
--- SELECT '' AS eleven, unique1, unique2, stringu1
---             FROM onek WHERE unique1 < 50
---             ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
+SELECT '' AS three, unique1, unique2, stringu1
+               FROM onek WHERE unique1 > 100
+               ORDER BY unique1 LIMIT 3 OFFSET 20;
+ SELECT '' AS zero, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 8 OFFSET 99;
+ SELECT '' AS eleven, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
 -- SELECT '' AS ten, unique1, unique2, stringu1

Review Comment:
   SG.



##########
sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out:
##########
@@ -38,6 +38,62 @@ struct<two:string,unique1:int,unique2:int,stringu1:string>
        62      633     KCAAAA
 
 
+-- !query
+SELECT '' AS three, unique1, unique2, stringu1
+               FROM onek WHERE unique1 > 100
+               ORDER BY unique1 LIMIT 3 OFFSET 20
+-- !query schema
+struct<three:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+       121     700     REAAAA
+       122     519     SEAAAA
+       123     777     TEAAAA
+
+
+-- !query
+SELECT '' AS zero, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 8 OFFSET 99
+-- !query schema
+struct<zero:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+
+
+
+-- !query
+SELECT '' AS eleven, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 20 OFFSET 39
+-- !query schema
+struct<eleven:string,unique1:int,unique2:int,stringu1:string>
+-- !query output
+       10      520     KAAAAA
+       9       49      JAAAAA
+       8       653     IAAAAA
+       7       647     HAAAAA
+       6       978     GAAAAA
+       5       541     FAAAAA
+       4       833     EAAAAA
+       3       431     DAAAAA
+       2       326     CAAAAA
+       1       214     BAAAAA
+       0       998     AAAAAA
+
+
+-- !query
+SELECT '' AS five, unique1, unique2, stringu1
+               FROM onek
+               ORDER BY unique1 LIMIT 5 OFFSET 900

Review Comment:
   do we have a test case with OFFSET 0?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -85,22 +85,43 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case ReturnAnswer(rootPlan) => rootPlan match {
         case Limit(IntegerLiteral(limit), Sort(order, true, child))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), child) =>
-          CollectLimitExec(limit, planLater(child)) :: Nil
+          CollectLimitExec(limit, 0, planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold 
=>
+          TakeOrderedAndProjectExec(limit, offset, order, child.output, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Project(projectList, Sort(order, true, child)))
+            if limit < conf.topKSortFallbackThreshold =>
+          TakeOrderedAndProjectExec(limit, offset, order, projectList, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset), child) =>
+          CollectLimitExec(limit, offset, planLater(child)) :: Nil
         case Tail(IntegerLiteral(limit), child) =>
           CollectTailExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
       case Limit(IntegerLiteral(limit), Sort(order, true, child))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
       case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, 
child)))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) 
:: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
+      case GlobalLimitAndOffset(

Review Comment:
   Sure, something like the comment on L203 starting with `// If it is an 
equi-join, we first look at the join hints w.r.t. the following order...`
   
   We could consider something like the following, for each `case` in this list:
   
   ```
           // This is a global LIMIT and OFFSET over a logical sorting operator,
           // where the specified limit is less than a heuristic threshold.
           // In this case we generate a physical top-K sorting operator, 
passing down
           // the limit and offset values to be evaluated inline during the 
physical
           // sorting operation for greater efficiency.
           case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
             Sort(order, true, child)) if limit < 
conf.topKSortFallbackThreshold =>
             TakeOrderedAndProjectExec(limit, offset, order, child.output, 
             planLater(child)) :: Nil
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to