dtenedor commented on code in PR #35975:
URL: https://github.com/apache/spark/pull/35975#discussion_r847634501


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -407,10 +407,31 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 
           case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
 
-          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+//          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)

Review Comment:
   please remove commented-out code?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -407,10 +407,31 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 
           case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
 
-          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+//          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+          case LocalLimit(limitExpr, child) =>
+            checkLimitLikeClause("limit", limitExpr)
+            child match {
+              case Offset(offsetExpr, _) =>
+                val limit = limitExpr.eval().asInstanceOf[Int]

Review Comment:
   In the parser it looks like we accept any expression for the OFFSET, but 
here we call `asInstanceOf[Int]`. Can we have an explicit check that this 
expression has integer type with an appropriate error message if not, and an 
accompanying test case that covers it?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -1145,6 +1145,23 @@ case class Expand(
     copy(child = newChild)
 }
 
+/**
+ * A logical offset, which may removing a specified number of rows from the 
beginning of the
+ * output of child logical plan.

Review Comment:
   Can we mention in this comment:
   1. Whether an accompanying LIMIT is also required, or the OFFSET is allowed 
to stand alone
   2. The semantics of which rows get skipped or preserved in the presence of 
an accompanying LIMIT. For example, if we have LIMIT 10 OFFSET 5, do we (a) 
impose a total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 
10 rows remaining?
   3. What are the boundary conditions, e.g. min and max allowed values?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala:
##########
@@ -42,10 +42,12 @@ trait LimitExec extends UnaryExecNode {
  * This operator will be used when a logical `Limit` operation is the final 
operator in an
  * logical plan, which happens when the user is collecting results back to the 
driver.
  */
-case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
+case class CollectLimitExec(limit: Int, offset: Int, child: SparkPlan) extends 
LimitExec {
   override def output: Seq[Attribute] = child.output
   override def outputPartitioning: Partitioning = SinglePartition
-  override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
+  override def executeCollect(): Array[InternalRow] = {
+    child.executeTake(limit + offset).drop(offset)

Review Comment:
   can you add a comment mentioning the reasoning behind why we add the `limit` 
and `offset` together into a single sum here, and then drop the `offset`, with 
a brief example?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -1256,6 +1273,25 @@ case class LocalLimit(limitExpr: Expression, child: 
LogicalPlan) extends OrderPr
     copy(child = newChild)
 }
 
+/**
+ * A global (coordinated) limit with offset. This operator can skip at most 
`offsetExpr` number and
+ * emit at most `limitExpr` number in total.

Review Comment:
   Similar to above, can we mention in this comment:
   
   1. Whether an accompanying LIMIT is also required, or the OFFSET is allowed 
to stand alone
   2. The semantics of which rows get skipped or preserved in the presence of 
an accompanying LIMIT. For example, if we have LIMIT 10 OFFSET 5, do we (a) 
impose a total limit of 10 + 5 = 15 rows and then discard the first 5, leaving 
10 rows remaining?
   3. What are the boundary conditions, e.g. min and max allowed values?
   4. What is the difference between this and a chain of regular Limit + Offset 
operators?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -1145,6 +1145,23 @@ case class Expand(
     copy(child = newChild)
 }
 
+/**
+ * A logical offset, which may removing a specified number of rows from the 
beginning of the
+ * output of child logical plan.
+ */
+case class Offset(offsetExpr: Expression, child: LogicalPlan) extends 
OrderPreservingUnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def maxRows: Option[Long] = {
+    import scala.math.max
+    offsetExpr match {
+      case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 
0) }
+      case _ => None

Review Comment:
   We expect the offset expression to always be an integer literal by this 
point, enforced by the parser? So we can drop this line?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala:
##########
@@ -42,10 +42,12 @@ trait LimitExec extends UnaryExecNode {
  * This operator will be used when a logical `Limit` operation is the final 
operator in an

Review Comment:
   Please update this comment to mention the semantics of the `offset` as well?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -85,22 +85,43 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case ReturnAnswer(rootPlan) => rootPlan match {
         case Limit(IntegerLiteral(limit), Sort(order, true, child))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), child) =>
-          CollectLimitExec(limit, planLater(child)) :: Nil
+          CollectLimitExec(limit, 0, planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold 
=>
+          TakeOrderedAndProjectExec(limit, offset, order, child.output, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Project(projectList, Sort(order, true, child)))
+            if limit < conf.topKSortFallbackThreshold =>
+          TakeOrderedAndProjectExec(limit, offset, order, projectList, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset), child) =>
+          CollectLimitExec(limit, offset, planLater(child)) :: Nil
         case Tail(IntegerLiteral(limit), child) =>
           CollectTailExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
       case Limit(IntegerLiteral(limit), Sort(order, true, child))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
       case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, 
child)))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) 
:: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
+      case GlobalLimitAndOffset(

Review Comment:
   can we have a comment here describing what the query planning is doing at a 
high level?



##########
sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql:
##########
@@ -12,25 +12,24 @@ SELECT '' AS five, unique1, unique2, stringu1
 SELECT '' AS two, unique1, unique2, stringu1
                FROM onek WHERE unique1 > 60 AND unique1 < 63
                ORDER BY unique1 LIMIT 5;
--- [SPARK-28330] ANSI SQL: Top-level <result offset clause> in <query 
expression>
--- SELECT '' AS three, unique1, unique2, stringu1
---             FROM onek WHERE unique1 > 100
---             ORDER BY unique1 LIMIT 3 OFFSET 20;
--- SELECT '' AS zero, unique1, unique2, stringu1
---             FROM onek WHERE unique1 < 50
---             ORDER BY unique1 DESC LIMIT 8 OFFSET 99;
--- SELECT '' AS eleven, unique1, unique2, stringu1
---             FROM onek WHERE unique1 < 50
---             ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
+SELECT '' AS three, unique1, unique2, stringu1
+               FROM onek WHERE unique1 > 100
+               ORDER BY unique1 LIMIT 3 OFFSET 20;
+ SELECT '' AS zero, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 8 OFFSET 99;
+ SELECT '' AS eleven, unique1, unique2, stringu1
+               FROM onek WHERE unique1 < 50
+               ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
 -- SELECT '' AS ten, unique1, unique2, stringu1

Review Comment:
   These queries remain commented-out, but it seems like they should be 
possible to test now. Can we uncomment them and enable as either positive or 
negative tests?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -407,10 +407,31 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 
           case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
 
-          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+//          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+          case LocalLimit(limitExpr, child) =>
+            checkLimitLikeClause("limit", limitExpr)
+            child match {
+              case Offset(offsetExpr, _) =>
+                val limit = limitExpr.eval().asInstanceOf[Int]
+                val offset = offsetExpr.eval().asInstanceOf[Int]
+                if (Int.MaxValue - limit < offset) {
+                  failAnalysis(
+                    s"""The sum of limit and offset must not be greater than 
Int.MaxValue,
+                       | but found limit = $limit, offset = 
$offset.""".stripMargin)
+                }
+              case _ =>
+            }
+
+          case Offset(offsetExpr, _) => checkLimitLikeClause("offset", 
offsetExpr)
 
           case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
 
+          case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
+            && o.children.exists(_.isInstanceOf[Offset]) =>
+            failAnalysis(
+              s"""Only the OFFSET clause is allowed in the LIMIT clause, but 
the OFFSET

Review Comment:
   -> The OFFSET clause is only allowed...



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -85,22 +85,43 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case ReturnAnswer(rootPlan) => rootPlan match {
         case Limit(IntegerLiteral(limit), Sort(order, true, child))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), child) =>
-          CollectLimitExec(limit, planLater(child)) :: Nil
+          CollectLimitExec(limit, 0, planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold 
=>
+          TakeOrderedAndProjectExec(limit, offset, order, child.output, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Project(projectList, Sort(order, true, child)))
+            if limit < conf.topKSortFallbackThreshold =>
+          TakeOrderedAndProjectExec(limit, offset, order, projectList, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset), child) =>
+          CollectLimitExec(limit, offset, planLater(child)) :: Nil
         case Tail(IntegerLiteral(limit), child) =>
           CollectTailExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
       case Limit(IntegerLiteral(limit), Sort(order, true, child))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
       case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, 
child)))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) 
:: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
+      case GlobalLimitAndOffset(
+      IntegerLiteral(limit),

Review Comment:
   please indent +4 spaces for these args?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -386,6 +386,8 @@ object UnsupportedOperationChecker extends Logging {
           throwError("Limits are not supported on streaming 
DataFrames/Datasets in Update " +
             "output mode")
 
+        case Offset(_, _) => throwError("Offset is not supported on streaming 
DataFrames/Datasets")

Review Comment:
   please add a unit test case to cover this error message?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -826,6 +848,20 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
     check(plan)
   }
 
+  /**
+   * Validate that the root node of query or subquery is [[Offset]].
+   */
+  private def checkOutermostOffset(plan: LogicalPlan): Unit = {
+    plan match {
+      case Offset(offsetExpr, _) =>
+        checkLimitLikeClause("offset", offsetExpr)
+        failAnalysis(
+          s"""Only the OFFSET clause is allowed in the LIMIT clause, but the 
OFFSET

Review Comment:
   this string is duplicated with L432 above, please dedup into one place?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -407,10 +407,31 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 
           case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
 
-          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+//          case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
+          case LocalLimit(limitExpr, child) =>
+            checkLimitLikeClause("limit", limitExpr)
+            child match {
+              case Offset(offsetExpr, _) =>
+                val limit = limitExpr.eval().asInstanceOf[Int]
+                val offset = offsetExpr.eval().asInstanceOf[Int]
+                if (Int.MaxValue - limit < offset) {
+                  failAnalysis(
+                    s"""The sum of limit and offset must not be greater than 
Int.MaxValue,

Review Comment:
   limit -> the LIMIT clause
   offset -> the OFFSET clause
   Int.MaxValue -> the maximum 32-bit integer value (2,147,483,647)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala:
##########
@@ -182,6 +184,48 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) 
extends BaseLimitExec {
     copy(child = newChild)
 }
 
+/**
+ * Skip the first `offset` elements then take the first `limit` of the 
following elements in
+ *  the child's single output partition.

Review Comment:
   remove extra space before 'the'



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1846,6 +1847,19 @@ object EliminateLimits extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] 
operators into one,
+ *  merging the expressions into one single expression.

Review Comment:
   please remove space before `merging`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala:
##########
@@ -75,6 +75,13 @@ object DistinctKeyVisitor extends 
LogicalPlanVisitor[Set[ExpressionSet]] {
     }
   }
 
+  override def visitOffset(p: Offset): Set[ExpressionSet] = {
+    p.maxRows match {
+      case Some(value) if value <= 1 => Set(ExpressionSet(p.output))

Review Comment:
   can we add a unit test that covers this behavior?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -1256,6 +1273,25 @@ case class LocalLimit(limitExpr: Expression, child: 
LogicalPlan) extends OrderPr
     copy(child = newChild)
 }
 
+/**
+ * A global (coordinated) limit with offset. This operator can skip at most 
`offsetExpr` number and
+ * emit at most `limitExpr` number in total.
+ */
+case class GlobalLimitAndOffset(
+    limitExpr: Expression,
+    offsetExpr: Expression,
+    child: LogicalPlan) extends OrderPreservingUnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def maxRows: Option[Long] = {
+    limitExpr match {
+      case IntegerLiteral(limit) => Some(limit)
+      case _ => None

Review Comment:
   same as above, if we enforce that the child is an integer literal by the 
parser, we can drop this line?
   



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala:
##########
@@ -531,6 +531,38 @@ class AnalysisErrorSuite extends AnalysisTest {
     "The limit expression must be equal to or greater than 0, but got -1" :: 
Nil
   )
 
+  errorTest(
+    "an evaluated offset class must not be null",
+    testRelation.offset(Literal(null, IntegerType)),
+    "The evaluated offset expression must not be null, but got " :: Nil
+  )
+
+  errorTest(
+    "num_rows in offset clause must be equal to or greater than 0",
+    listRelation.offset(-1),

Review Comment:
   can we also add test cases where:
   1. the OFFSET expression is not an integer value, e.g. "abc"
   2. the OFFSET expression is a long integer value 
   3. the OFFSET expression is a constant but non-literal value, e.g. CASTing 
the current date to an integer, or some integer-valued UDF



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala:
##########
@@ -90,6 +90,15 @@ object SizeInBytesOnlyStatsPlanVisitor extends 
LogicalPlanVisitor[Statistics] {
       rowCount = Some(rowCount))
   }
 
+  override def visitOffset(p: Offset): Statistics = {
+    val offset = p.offsetExpr.eval().asInstanceOf[Int]
+    val childStats = p.child.stats
+    val rowCount: BigInt = 
childStats.rowCount.map(_.-(offset).max(0)).getOrElse(0)

Review Comment:
   this line has a lot going on, would you mind breaking apart the logic into 
multiple lines with a comment describing the math?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -85,22 +85,43 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case ReturnAnswer(rootPlan) => rootPlan match {
         case Limit(IntegerLiteral(limit), Sort(order, true, child))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
             if limit < conf.topKSortFallbackThreshold =>
-          TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+          TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), child) =>
-          CollectLimitExec(limit, planLater(child)) :: Nil
+          CollectLimitExec(limit, 0, planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold 
=>
+          TakeOrderedAndProjectExec(limit, offset, order, child.output, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset),
+          Project(projectList, Sort(order, true, child)))
+            if limit < conf.topKSortFallbackThreshold =>
+          TakeOrderedAndProjectExec(limit, offset, order, projectList, 
planLater(child)) :: Nil
+        case GlobalLimitAndOffset(IntegerLiteral(limit), 
IntegerLiteral(offset), child) =>
+          CollectLimitExec(limit, offset, planLater(child)) :: Nil
         case Tail(IntegerLiteral(limit), child) =>
           CollectTailExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
       case Limit(IntegerLiteral(limit), Sort(order, true, child))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, child.output, 
planLater(child)) :: Nil
       case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, 
child)))
           if limit < conf.topKSortFallbackThreshold =>
-        TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) 
:: Nil
+        TakeOrderedAndProjectExec(limit, 0, order, projectList, 
planLater(child)) :: Nil
+      case GlobalLimitAndOffset(
+      IntegerLiteral(limit),
+      IntegerLiteral(offset),
+      Sort(order, true, child))
+        if limit < conf.topKSortFallbackThreshold =>
+        TakeOrderedAndProjectExec(limit, offset, order, child.output, 
planLater(child)) :: Nil
+      case GlobalLimitAndOffset(
+      IntegerLiteral(limit),

Review Comment:
   same here?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1846,6 +1847,19 @@ object EliminateLimits extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] 
operators into one,
+ *  merging the expressions into one single expression.
+ */
+object RewriteOffsets extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case GlobalLimit(le, Offset(oe, grandChild)) =>

Review Comment:
   Can we expand the rule comment to mention the difference between LocalLimit 
and GlobalLimit (possibly referring to another location that also explains it 
in greater depth)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to