[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-26 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882695556


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+val offsets = df.queryExecution.optimizedPlan.collect {
+  case offset: Offset => offset
+}
+if (removed) {
+  assert(offsets.isEmpty)
+} else {
+  assert(offsets.nonEmpty)
+}
+  }
+
+  test("simple scan with OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .sort($"salary")
+  .offset(1)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df3, Seq(Row(1, "amy", 1.00, 1000.0, true)))
+
+val df4 = spark.read
+  .option("partitionColumn", "dept")
+  .option("lowerBound", "0")
+  .option("upperBound", "2")
+  .option("numPartitions", "2")
+  .table("h2.test.employee")
+  .filter($"dept" > 1)
+  .offset(1)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+checkAnswer(df4, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .offset(1)
+checkOffsetRemoved(df5, false)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+checkAnswer(df5, Seq(Row(2, 22000.00), Row(6, 12000.00)))
+
+val name = udf { (x: String) => x.matches("cat|dav|amy") }
+val sub = udf { (x: String) => x.substring(0, 3) }
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+  .filter(name($"shortName"))
+  .offset(1)
+checkOffsetRemoved(df6, false)
+// OFFSET is pushed down only if all the filters are pushed down
+checkPushedInfo(df6, "PushedFilters: [], ")
+checkAnswer(df6, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, 
"cat")))
+  }
+
+  test("simple scan with LIMIT and OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, 
PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownLimit", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df2, false)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df3)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, 
ReadSchema:")
+checkAnswer(df3, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df4 = spark.read
+  .option("pushDownLimit", "false")
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df4, false)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df4, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .sort($"salary")
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df5)
+checkOffsetRemoved(df5)
+checkPushedInfo(df5, "PushedFilters: [DEPT

[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-26 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882693128


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+val offsets = df.queryExecution.optimizedPlan.collect {
+  case offset: Offset => offset
+}
+if (removed) {
+  assert(offsets.isEmpty)
+} else {
+  assert(offsets.nonEmpty)
+}
+  }
+
+  test("simple scan with OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .sort($"salary")
+  .offset(1)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df3, Seq(Row(1, "amy", 1.00, 1000.0, true)))
+
+val df4 = spark.read
+  .option("partitionColumn", "dept")
+  .option("lowerBound", "0")
+  .option("upperBound", "2")
+  .option("numPartitions", "2")
+  .table("h2.test.employee")
+  .filter($"dept" > 1)
+  .offset(1)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+checkAnswer(df4, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .offset(1)
+checkOffsetRemoved(df5, false)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+checkAnswer(df5, Seq(Row(2, 22000.00), Row(6, 12000.00)))
+
+val name = udf { (x: String) => x.matches("cat|dav|amy") }
+val sub = udf { (x: String) => x.substring(0, 3) }
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+  .filter(name($"shortName"))
+  .offset(1)
+checkOffsetRemoved(df6, false)
+// OFFSET is pushed down only if all the filters are pushed down
+checkPushedInfo(df6, "PushedFilters: [], ")
+checkAnswer(df6, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, 
"cat")))
+  }
+
+  test("simple scan with LIMIT and OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, 
PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownLimit", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df2, false)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df3)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, 
ReadSchema:")
+checkAnswer(df3, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df4 = spark.read
+  .option("pushDownLimit", "false")
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df4, false)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df4, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .sort($"salary")
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df5)
+checkOffsetRemoved(df5)
+checkPushedInfo(df5, "PushedFilters: [DEPT

[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-26 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882692211


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+val offsets = df.queryExecution.optimizedPlan.collect {
+  case offset: Offset => offset
+}
+if (removed) {
+  assert(offsets.isEmpty)
+} else {
+  assert(offsets.nonEmpty)
+}
+  }
+
+  test("simple scan with OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .sort($"salary")
+  .offset(1)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df3, Seq(Row(1, "amy", 1.00, 1000.0, true)))
+
+val df4 = spark.read
+  .option("partitionColumn", "dept")
+  .option("lowerBound", "0")
+  .option("upperBound", "2")
+  .option("numPartitions", "2")
+  .table("h2.test.employee")
+  .filter($"dept" > 1)
+  .offset(1)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+checkAnswer(df4, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .offset(1)
+checkOffsetRemoved(df5, false)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+checkAnswer(df5, Seq(Row(2, 22000.00), Row(6, 12000.00)))
+
+val name = udf { (x: String) => x.matches("cat|dav|amy") }
+val sub = udf { (x: String) => x.substring(0, 3) }
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+  .filter(name($"shortName"))
+  .offset(1)
+checkOffsetRemoved(df6, false)
+// OFFSET is pushed down only if all the filters are pushed down
+checkPushedInfo(df6, "PushedFilters: [], ")
+checkAnswer(df6, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, 
"cat")))
+  }
+
+  test("simple scan with LIMIT and OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, 
PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownLimit", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df2, false)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df3)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, 
ReadSchema:")
+checkAnswer(df3, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df4 = spark.read
+  .option("pushDownLimit", "false")
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df4, false)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df4, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .sort($"salary")
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df5)
+checkOffsetRemoved(df5)
+checkPushedInfo(df5, "PushedFilters: [DEPT

[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-26 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882690656


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+val offsets = df.queryExecution.optimizedPlan.collect {
+  case offset: Offset => offset
+}
+if (removed) {
+  assert(offsets.isEmpty)
+} else {
+  assert(offsets.nonEmpty)
+}
+  }
+
+  test("simple scan with OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .sort($"salary")
+  .offset(1)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df3, Seq(Row(1, "amy", 1.00, 1000.0, true)))
+
+val df4 = spark.read
+  .option("partitionColumn", "dept")
+  .option("lowerBound", "0")
+  .option("upperBound", "2")
+  .option("numPartitions", "2")
+  .table("h2.test.employee")
+  .filter($"dept" > 1)
+  .offset(1)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+checkAnswer(df4, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")

Review Comment:
   can we test the same query while we can fully push down agg? then offset can 
be pushed as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-26 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882687268


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -407,8 +407,65 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper wit
 case other => (other, false)
   }
 
-  def pushDownLimits(plan: LogicalPlan): LogicalPlan = plan.transform {
+  private def pushDownOffset(
+  plan: LogicalPlan,
+  offset: Int): Boolean = plan match {
+case sHolder: ScanBuilderHolder =>
+  val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset)
+  if (isPushed) {
+sHolder.pushedOffset = Some(offset)
+  }
+  isPushed
+case p: Project =>
+  pushDownOffset(p.child, offset)
+case _ => false
+  }
+
+  def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform {
+case offset @ LimitAndOffset(limit, offsetValue, child) =>
+  val (newChild, canRemoveLimit) = pushDownLimit(child, limit)
+  if (canRemoveLimit) {
+// If we can remove limit, it indicates data source only have one 
partition.
+// For `dataset.limit(m).offset(n)`, try to push down `LIMIT (m - n) 
OFFSET n`.
+// For example, `dataset.limit(5).offset(3)`, we can push down `LIMIT 
2 OFFSET 3`.
+val isPushed = pushDownOffset(newChild, offsetValue)
+if (isPushed) {
+  newChild
+} else {
+  // For `dataset.limit(m).offset(n)`, only `LIMIT m` be pushed. Spark 
will do `OFFSET n`.
+  offset
+}
+  } else {
+// If we can't push down limit and offset, return `Offset`.
+offset
+  }
+case globalLimit @ OffsetAndLimit(offset, limitValue, child) =>
+  val isPushed = pushDownOffset(child, offset)

Review Comment:
   no, we can't push OFFSET first. We need to follow the API doc and push LIMIT 
first. Although OFFSET appears first in the query plan, we can still push down 
LIMIT by updating the limit value to `limit + offset`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-26 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882685206


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -407,8 +407,65 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper wit
 case other => (other, false)
   }
 
-  def pushDownLimits(plan: LogicalPlan): LogicalPlan = plan.transform {
+  private def pushDownOffset(
+  plan: LogicalPlan,
+  offset: Int): Boolean = plan match {
+case sHolder: ScanBuilderHolder =>
+  val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset)
+  if (isPushed) {
+sHolder.pushedOffset = Some(offset)
+  }
+  isPushed
+case p: Project =>
+  pushDownOffset(p.child, offset)
+case _ => false
+  }
+
+  def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform {
+case offset @ LimitAndOffset(limit, offsetValue, child) =>
+  val (newChild, canRemoveLimit) = pushDownLimit(child, limit)
+  if (canRemoveLimit) {
+// If we can remove limit, it indicates data source only have one 
partition.
+// For `dataset.limit(m).offset(n)`, try to push down `LIMIT (m - n) 
OFFSET n`.
+// For example, `dataset.limit(5).offset(3)`, we can push down `LIMIT 
2 OFFSET 3`.

Review Comment:
   This comment makes no sense here. We are pushing down operators, not pushing 
down a SQL query to JDBC. It's only a problem in JDBC that `LIMIT x OFFSET y` 
means doing OFFSET first, then LIMIT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-26 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882663613


##
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala:
##
@@ -143,13 +143,15 @@ case class RowDataSourceScanExec(
 
 val topNOrLimitInfo =

Review Comment:
   seems we don't need to change anything in `topNOrLimitInfo`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-25 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882306593


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -419,6 +420,60 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper wit
   }
   }
 
+  private def pushDownOffset(
+  plan: LogicalPlan,
+  offset: Int): (LogicalPlan, Boolean) = plan match {
+case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if 
filter.isEmpty =>
+  val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset)
+  if (isPushed) {
+sHolder.pushedOffset = Some(offset)
+  }
+  (operation, isPushed)
+case p: Project =>
+  val (newChild, isPushed) = pushDownOffset(p.child, offset)
+  (p.withNewChildren(Seq(newChild)), isPushed)
+case other => (other, false)
+  }
+
+  def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform {
+case offset @ Offset(IntegerLiteral(n), child) =>
+  // For `dataset.offset(n)`, try to push down `OFFSET n`.
+  val (newChild, isPushed) = pushDownOffset(child, n)
+  if (isPushed) {
+newChild
+  } else {
+offset
+  }

Review Comment:
   ```suggestion
 private def pushDownOffset(
 plan: LogicalPlan,
 offset: Int): Boolean = plan match {
   case sHolder: ScanBuilderHolder =>
 val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset)
 if (isPushed) {
   sHolder.pushedOffset = Some(offset)
 }
 isPushed
   case p: Project =>
 pushDownOffset(p.child, offset)
   case _ => false
 }
   
 def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = 
plan.transform {
   case offset @ Offset(IntegerLiteral(n), child) =>
 // For `dataset.offset(n)`, try to push down `OFFSET n`.
 val isPushed = pushDownOffset(child, n)
 if (isPushed) {
   child
 } else {
   offset
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-25 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882304824


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper wit
   pushDownFilters,
   pushDownAggregates,
   pushDownLimits,

Review Comment:
   LIMIT should be pushed by `pushDownLimitAndOffset` as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-25 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882304320


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper wit
   pushDownFilters,
   pushDownAggregates,
   pushDownLimits,

Review Comment:
   why do we still keep it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-25 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882304055


##
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala:
##
@@ -141,15 +141,20 @@ case class RowDataSourceScanExec(
   handledFilters
 }
 
-val topNOrLimitInfo =
+val limitOrOffsetInfo =
   if (pushedDownOperators.limit.isDefined && 
pushedDownOperators.sortValues.nonEmpty) {
-val pushedTopN =
+val topNStr =
   s"ORDER BY 
${seqToString(pushedDownOperators.sortValues.map(_.describe()))}" +
   s" LIMIT ${pushedDownOperators.limit.get}"
-Some("PushedTopN" -> pushedTopN)
-} else {
-  pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value")
-}
+if (pushedDownOperators.offset.isDefined) {
+  Map("PushedPaging" -> s"$topNStr OFFSET 
${pushedDownOperators.offset.get}")

Review Comment:
   do we need to combine them into one metrics? I think we can have 
`PushedTopN` and `PushedOffset` together in the metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-24 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r880689282


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -419,6 +420,60 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper wit
   }
   }
 
+  private def pushDownOffset(
+  plan: LogicalPlan,
+  offset: Int): (LogicalPlan, Boolean) = plan match {
+case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if 
filter.isEmpty =>
+  val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset)
+  if (isPushed) {
+sHolder.pushedOffset = Some(offset)
+  }
+  (operation, isPushed)
+case p: Project =>
+  val (newChild, isPushed) = pushDownOffset(p.child, offset)
+  (p.withNewChildren(Seq(newChild)), isPushed)
+case other => (other, false)
+  }
+
+  def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform {

Review Comment:
   I think the code can be much easier to read if we push down limit and offset 
together.
   ```
   def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform {
 case Limit...
 case Offset...
 case LimitAndOffset ...
 ...
   }
   ```
   the code should be similar to the plann rule `SpecialLimits`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-24 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r880684191


##
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala:
##
@@ -141,15 +141,23 @@ case class RowDataSourceScanExec(
   handledFilters
 }
 
-val topNOrLimitInfo =
+val limitOrOffsetInfo =
   if (pushedDownOperators.limit.isDefined && 
pushedDownOperators.sortValues.nonEmpty) {
-val pushedTopN =
-  s"ORDER BY 
${seqToString(pushedDownOperators.sortValues.map(_.describe()))}" +
-  s" LIMIT ${pushedDownOperators.limit.get}"
-Some("PushedTopN" -> pushedTopN)
-} else {
-  pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value")
-}
+if (pushedDownOperators.offset.isDefined) {

Review Comment:
   nit:
   ```
   val topNStr = "ORDER BY ... LIMIT ..."
   if (pushedDownOperators.offset.isDefined) {
 $"topNStr OFFSET ..."
   } else {
 topNStr
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-23 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r879216237


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java:
##
@@ -23,7 +23,8 @@
  * An interface for building the {@link Scan}. Implementations can mixin 
SupportsPushDownXYZ
  * interfaces to do operator push down, and keep the operator push down result 
in the returned
  * {@link Scan}. When pushing down operators, the push down order is:
- * sample -> filter -> aggregate -> limit -> column pruning.
+ * sample -> filter -> aggregate -> offset -> limit/top-n(sort + 
limit) ->

Review Comment:
   We can switch the order of LIMIT and OFFSET to match this defined order, but 
we can't switch the order of ORDER BY and OFFSET. What if the SQL query is 
`ORDER BY ... LIMIT ... OFFSET ...`? Nothing can be pushed down?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r877141680


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+val offsets = df.queryExecution.optimizedPlan.collect {
+  case offset: Offset => offset
+}
+if (removed) {
+  assert(offsets.isEmpty)
+} else {
+  assert(offsets.nonEmpty)
+}
+  }
+
+  test("simple scan with OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .option("partitionColumn", "dept")
+  .option("lowerBound", "0")
+  .option("upperBound", "2")
+  .option("numPartitions", "2")
+  .table("h2.test.employee")
+  .filter($"dept" > 1)
+  .offset(1)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+checkAnswer(df3, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .offset(1)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00)))
+
+val name = udf { (x: String) => x.matches("cat|dav|amy") }
+val sub = udf { (x: String) => x.substring(0, 3) }
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+  .filter(name($"shortName"))
+  .offset(1)
+checkOffsetRemoved(df5, false)
+// OFFSET is pushed down only if all the filters are pushed down
+checkPushedInfo(df5, "PushedFilters: [], ")
+checkAnswer(df5, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, 
"cat")))
+  }
+
+  test("simple scan with LIMIT and OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, 
PushedOffset: OFFSET 1,")

Review Comment:
   This does not match 
https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26
   
   Assume that I have a local array data source. According to the API doc, 
Spark pushes down LIMIT first. For this query, I'll do `array.take(1).drop(1)`. 
This is wrong and doesn't match the query `df.limit(2).offset(1)`.
   
   we should either fix the API doc, or fix the pushdown logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r877141680


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+val offsets = df.queryExecution.optimizedPlan.collect {
+  case offset: Offset => offset
+}
+if (removed) {
+  assert(offsets.isEmpty)
+} else {
+  assert(offsets.nonEmpty)
+}
+  }
+
+  test("simple scan with OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .option("partitionColumn", "dept")
+  .option("lowerBound", "0")
+  .option("upperBound", "2")
+  .option("numPartitions", "2")
+  .table("h2.test.employee")
+  .filter($"dept" > 1)
+  .offset(1)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+checkAnswer(df3, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .offset(1)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00)))
+
+val name = udf { (x: String) => x.matches("cat|dav|amy") }
+val sub = udf { (x: String) => x.substring(0, 3) }
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+  .filter(name($"shortName"))
+  .offset(1)
+checkOffsetRemoved(df5, false)
+// OFFSET is pushed down only if all the filters are pushed down
+checkPushedInfo(df5, "PushedFilters: [], ")
+checkAnswer(df5, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, 
"cat")))
+  }
+
+  test("simple scan with LIMIT and OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, 
PushedOffset: OFFSET 1,")

Review Comment:
   This does not match 
https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26
   
   Assume that I have a local array data source. According to the API doc, 
Spark pushes down LIMIT first. For this query, I'll do 
`array.limit(1).drop(1)`. This is wrong and doesn't match the query 
`df.limit(2).offset(1)`.
   
   we should either fix the API doc, or fix the pushdown logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-17 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r874584082


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownLimit.java:
##
@@ -21,8 +21,8 @@
 
 /**
  * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
- * push down LIMIT. Please note that the combination of LIMIT with other 
operations
- * such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is 
NOT pushed down.
+ * push down LIMIT. We can push down LIMIT with many other operations if they 
follow the
+ * operator order we defined in {@link ScanBuilder}'s class doc.

Review Comment:
   let's open a backport PR for 3.3 to fix the classdoc later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-17 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r874589600


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -419,6 +420,72 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper wit
   }
   }
 
+  private def pushDownOffset(

Review Comment:
   let's have a single `pushDownLimitAndOffset`. It should be simpler to the 
planner rule, and we need to match different kind of query plans: LIMIT only, 
OFFSET only, LIMIT + OFFSET, OFFSET + LIMIT, SORT + LIMT, SORT + LIMIT + 
OFFSET, etc.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -419,6 +420,72 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper wit
   }
   }
 
+  private def pushDownOffset(

Review Comment:
   let's have a single `pushDownLimitAndOffset`. It should be similar to the 
planner rule, and we need to match different kind of query plans: LIMIT only, 
OFFSET only, LIMIT + OFFSET, OFFSET + LIMIT, SORT + LIMT, SORT + LIMIT + 
OFFSET, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-17 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r874587459


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper wit
   pushDownFilters,
   pushDownAggregates,
   pushDownLimits,
+  pushDownOffsets,

Review Comment:
   Or can we push them down in one step?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-17 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r874587041


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper wit
   pushDownFilters,
   pushDownAggregates,
   pushDownLimits,
+  pushDownOffsets,

Review Comment:
   according to the doc, OFFSET should be pushed after LIMIT now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-17 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r874586232


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala:
##
@@ -304,10 +307,11 @@ private[jdbc] class JDBCRDD(
 }
 
 val myLimitClause: String = dialect.getLimitClause(limit)
+val myOffsetClause: String = dialect.getOffsetClause(offset)
 
 val sqlText = options.prepareQuery +
   s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" +
-  s" $myWhereClause $getGroupByClause $getOrderByClause $myLimitClause"
+  s" $myWhereClause $getGroupByClause $getOrderByClause $myLimitClause 
$myOffsetClause"

Review Comment:
   We need to be careful here. The semantic of pushdown is LIMIT first, then 
OFFSET, while `LIMIT a OFFSET b` means OFFSET first in SQL spec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-16 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r873515998


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -419,6 +420,30 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper wit
   }
   }
 
+  private def pushDownOffset(plan: LogicalPlan, offset: Int): (LogicalPlan, 
Boolean) = plan match {
+case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if 
filter.isEmpty =>
+  val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset)
+  if (isPushed) {
+sHolder.pushedOffset = Some(offset)
+  }
+  (operation, isPushed)
+case p: Project =>
+  val (newChild, isPushed) = pushDownOffset(p.child, offset)
+  (p.withNewChildren(Seq(newChild)), isPushed)
+case other => (other, false)
+  }
+
+  def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform {
+// TODO supports push down Limit append Offset or Offset append Limit
+case offset @ Offset(IntegerLiteral(n), child) =>

Review Comment:
   We can match offset, limit + offset and offset + limit, similar to the 
planner rule (after https://github.com/apache/spark/pull/36541)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-16 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r873511627


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java:
##
@@ -23,7 +23,7 @@
  * An interface for building the {@link Scan}. Implementations can mixin 
SupportsPushDownXYZ
  * interfaces to do operator push down, and keep the operator push down result 
in the returned
  * {@link Scan}. When pushing down operators, the push down order is:
- * sample -> filter -> aggregate -> limit -> column pruning.
+ * sample -> filter -> aggregate -> offset -> limit or top N -> 
column pruning.

Review Comment:
   top n is a bit tricky as it's sort + limit. how about `aggregate -> 
limit/top-N(sort + limit) -> offset`? the order of limit and offset doesn't 
matter as we can always switch the order and adjust the value. And this order 
matches the physical plan more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-16 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r873511627


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java:
##
@@ -23,7 +23,7 @@
  * An interface for building the {@link Scan}. Implementations can mixin 
SupportsPushDownXYZ
  * interfaces to do operator push down, and keep the operator push down result 
in the returned
  * {@link Scan}. When pushing down operators, the push down order is:
- * sample -> filter -> aggregate -> limit -> column pruning.
+ * sample -> filter -> aggregate -> offset -> limit or top N -> 
column pruning.

Review Comment:
   top n is a bit tricky as it's sort + limit. how about `aggregate -> 
limit/top-N(sort + limit) -> offset`? the order of limit and offset doesn't 
matter as we can always switch the order and adjust the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-16 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r873511627


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java:
##
@@ -23,7 +23,7 @@
  * An interface for building the {@link Scan}. Implementations can mixin 
SupportsPushDownXYZ
  * interfaces to do operator push down, and keep the operator push down result 
in the returned
  * {@link Scan}. When pushing down operators, the push down order is:
- * sample -> filter -> aggregate -> limit -> column pruning.
+ * sample -> filter -> aggregate -> offset -> limit or top N -> 
column pruning.

Review Comment:
   top n is a bit tricky as it's sort + limit. how about `aggregate -> 
limit/top-n(sort + limit) -> offset`? the order of limit and offset doesn't 
matter as we can always switch the order and adjust the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-15 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r873341127


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownOffset.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
+ * push down OFFSET. Please note that the combination of OFFSET with other 
operations
+ * such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is 
NOT pushed down.

Review Comment:
   BTW we need to update `ScanBuider`'s classdoc for new pushdown support.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-15 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r873340929


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownOffset.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
+ * push down OFFSET. Please note that the combination of OFFSET with other 
operations
+ * such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is 
NOT pushed down.

Review Comment:
   I understand that this is copied from other pushdown interfaces, but I find 
it really hard to follow. We can push down OFFSET with many other operators if 
they follow the operator order we defined in `ScanBuilder`'s class doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-11 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r870959719


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper wit
   pushDownFilters,
   pushDownAggregates,
   pushDownLimits,
+  pushDownOffsets,

Review Comment:
   shall we push down offset before limit? The natural operators order in SQL 
is offset then limit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-11 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r870959171


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -2102,6 +2102,16 @@ class Dataset[T] private[sql](
 Limit(Literal(n), logicalPlan)
   }
 
+  /**
+   * Returns a new Dataset by skipping the first `m` rows.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def offset(n: Int): Dataset[T] = withTypedPlan {

Review Comment:
   please open a separate PR for it. This is adding a new user-facing API and 
needs more visibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-11 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r870958697


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##
@@ -433,13 +433,6 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 
   case Offset(offsetExpr, _) => checkLimitLikeClause("offset", 
offsetExpr)
 
-  case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]

Review Comment:
   can we have a separate PR for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org