[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882695556 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { +val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset +} +if (removed) { + assert(offsets.isEmpty) +} else { + assert(offsets.nonEmpty) +} + } + + test("simple scan with OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .sort($"salary") + .offset(1) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df3, Seq(Row(1, "amy", 1.00, 1000.0, true))) + +val df4 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") +checkAnswer(df4, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 12000, 1200, true))) + +val df5 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .offset(1) +checkOffsetRemoved(df5, false) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") +checkAnswer(df5, Seq(Row(2, 22000.00), Row(6, 12000.00))) + +val name = udf { (x: String) => x.matches("cat|dav|amy") } +val sub = udf { (x: String) => x.substring(0, 3) } +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .offset(1) +checkOffsetRemoved(df6, false) +// OFFSET is pushed down only if all the filters are pushed down +checkPushedInfo(df6, "PushedFilters: [], ") +checkAnswer(df6, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat"))) + } + + test("simple scan with LIMIT and OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownLimit", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df2, false) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df3) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, ReadSchema:") +checkAnswer(df3, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df4 = spark.read + .option("pushDownLimit", "false") + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df4, false) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df4, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df5 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .sort($"salary") + .limit(2) + .offset(1) +checkLimitRemoved(df5) +checkOffsetRemoved(df5) +checkPushedInfo(df5, "PushedFilters: [DEPT
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882693128 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { +val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset +} +if (removed) { + assert(offsets.isEmpty) +} else { + assert(offsets.nonEmpty) +} + } + + test("simple scan with OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .sort($"salary") + .offset(1) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df3, Seq(Row(1, "amy", 1.00, 1000.0, true))) + +val df4 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") +checkAnswer(df4, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 12000, 1200, true))) + +val df5 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .offset(1) +checkOffsetRemoved(df5, false) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") +checkAnswer(df5, Seq(Row(2, 22000.00), Row(6, 12000.00))) + +val name = udf { (x: String) => x.matches("cat|dav|amy") } +val sub = udf { (x: String) => x.substring(0, 3) } +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .offset(1) +checkOffsetRemoved(df6, false) +// OFFSET is pushed down only if all the filters are pushed down +checkPushedInfo(df6, "PushedFilters: [], ") +checkAnswer(df6, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat"))) + } + + test("simple scan with LIMIT and OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownLimit", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df2, false) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df3) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, ReadSchema:") +checkAnswer(df3, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df4 = spark.read + .option("pushDownLimit", "false") + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df4, false) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df4, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df5 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .sort($"salary") + .limit(2) + .offset(1) +checkLimitRemoved(df5) +checkOffsetRemoved(df5) +checkPushedInfo(df5, "PushedFilters: [DEPT
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882692211 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { +val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset +} +if (removed) { + assert(offsets.isEmpty) +} else { + assert(offsets.nonEmpty) +} + } + + test("simple scan with OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .sort($"salary") + .offset(1) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df3, Seq(Row(1, "amy", 1.00, 1000.0, true))) + +val df4 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") +checkAnswer(df4, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 12000, 1200, true))) + +val df5 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .offset(1) +checkOffsetRemoved(df5, false) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") +checkAnswer(df5, Seq(Row(2, 22000.00), Row(6, 12000.00))) + +val name = udf { (x: String) => x.matches("cat|dav|amy") } +val sub = udf { (x: String) => x.substring(0, 3) } +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .offset(1) +checkOffsetRemoved(df6, false) +// OFFSET is pushed down only if all the filters are pushed down +checkPushedInfo(df6, "PushedFilters: [], ") +checkAnswer(df6, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat"))) + } + + test("simple scan with LIMIT and OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownLimit", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df2, false) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df3) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, ReadSchema:") +checkAnswer(df3, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df4 = spark.read + .option("pushDownLimit", "false") + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df4, false) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df4, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df5 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .sort($"salary") + .limit(2) + .offset(1) +checkLimitRemoved(df5) +checkOffsetRemoved(df5) +checkPushedInfo(df5, "PushedFilters: [DEPT
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882690656 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { +val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset +} +if (removed) { + assert(offsets.isEmpty) +} else { + assert(offsets.nonEmpty) +} + } + + test("simple scan with OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .sort($"salary") + .offset(1) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df3, Seq(Row(1, "amy", 1.00, 1000.0, true))) + +val df4 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") +checkAnswer(df4, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 12000, 1200, true))) + +val df5 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") Review Comment: can we test the same query while we can fully push down agg? then offset can be pushed as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882687268 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -407,8 +407,65 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit case other => (other, false) } - def pushDownLimits(plan: LogicalPlan): LogicalPlan = plan.transform { + private def pushDownOffset( + plan: LogicalPlan, + offset: Int): Boolean = plan match { +case sHolder: ScanBuilderHolder => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + isPushed +case p: Project => + pushDownOffset(p.child, offset) +case _ => false + } + + def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform { +case offset @ LimitAndOffset(limit, offsetValue, child) => + val (newChild, canRemoveLimit) = pushDownLimit(child, limit) + if (canRemoveLimit) { +// If we can remove limit, it indicates data source only have one partition. +// For `dataset.limit(m).offset(n)`, try to push down `LIMIT (m - n) OFFSET n`. +// For example, `dataset.limit(5).offset(3)`, we can push down `LIMIT 2 OFFSET 3`. +val isPushed = pushDownOffset(newChild, offsetValue) +if (isPushed) { + newChild +} else { + // For `dataset.limit(m).offset(n)`, only `LIMIT m` be pushed. Spark will do `OFFSET n`. + offset +} + } else { +// If we can't push down limit and offset, return `Offset`. +offset + } +case globalLimit @ OffsetAndLimit(offset, limitValue, child) => + val isPushed = pushDownOffset(child, offset) Review Comment: no, we can't push OFFSET first. We need to follow the API doc and push LIMIT first. Although OFFSET appears first in the query plan, we can still push down LIMIT by updating the limit value to `limit + offset` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882685206 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -407,8 +407,65 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit case other => (other, false) } - def pushDownLimits(plan: LogicalPlan): LogicalPlan = plan.transform { + private def pushDownOffset( + plan: LogicalPlan, + offset: Int): Boolean = plan match { +case sHolder: ScanBuilderHolder => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + isPushed +case p: Project => + pushDownOffset(p.child, offset) +case _ => false + } + + def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform { +case offset @ LimitAndOffset(limit, offsetValue, child) => + val (newChild, canRemoveLimit) = pushDownLimit(child, limit) + if (canRemoveLimit) { +// If we can remove limit, it indicates data source only have one partition. +// For `dataset.limit(m).offset(n)`, try to push down `LIMIT (m - n) OFFSET n`. +// For example, `dataset.limit(5).offset(3)`, we can push down `LIMIT 2 OFFSET 3`. Review Comment: This comment makes no sense here. We are pushing down operators, not pushing down a SQL query to JDBC. It's only a problem in JDBC that `LIMIT x OFFSET y` means doing OFFSET first, then LIMIT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882663613 ## sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala: ## @@ -143,13 +143,15 @@ case class RowDataSourceScanExec( val topNOrLimitInfo = Review Comment: seems we don't need to change anything in `topNOrLimitInfo` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882306593 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,60 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset( + plan: LogicalPlan, + offset: Int): (LogicalPlan, Boolean) = plan match { +case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + (operation, isPushed) +case p: Project => + val (newChild, isPushed) = pushDownOffset(p.child, offset) + (p.withNewChildren(Seq(newChild)), isPushed) +case other => (other, false) + } + + def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform { +case offset @ Offset(IntegerLiteral(n), child) => + // For `dataset.offset(n)`, try to push down `OFFSET n`. + val (newChild, isPushed) = pushDownOffset(child, n) + if (isPushed) { +newChild + } else { +offset + } Review Comment: ```suggestion private def pushDownOffset( plan: LogicalPlan, offset: Int): Boolean = plan match { case sHolder: ScanBuilderHolder => val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) if (isPushed) { sHolder.pushedOffset = Some(offset) } isPushed case p: Project => pushDownOffset(p.child, offset) case _ => false } def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform { case offset @ Offset(IntegerLiteral(n), child) => // For `dataset.offset(n)`, try to push down `OFFSET n`. val isPushed = pushDownOffset(child, n) if (isPushed) { child } else { offset } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882304824 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit pushDownFilters, pushDownAggregates, pushDownLimits, Review Comment: LIMIT should be pushed by `pushDownLimitAndOffset` as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882304320 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit pushDownFilters, pushDownAggregates, pushDownLimits, Review Comment: why do we still keep it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r882304055 ## sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala: ## @@ -141,15 +141,20 @@ case class RowDataSourceScanExec( handledFilters } -val topNOrLimitInfo = +val limitOrOffsetInfo = if (pushedDownOperators.limit.isDefined && pushedDownOperators.sortValues.nonEmpty) { -val pushedTopN = +val topNStr = s"ORDER BY ${seqToString(pushedDownOperators.sortValues.map(_.describe()))}" + s" LIMIT ${pushedDownOperators.limit.get}" -Some("PushedTopN" -> pushedTopN) -} else { - pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value") -} +if (pushedDownOperators.offset.isDefined) { + Map("PushedPaging" -> s"$topNStr OFFSET ${pushedDownOperators.offset.get}") Review Comment: do we need to combine them into one metrics? I think we can have `PushedTopN` and `PushedOffset` together in the metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r880689282 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,60 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset( + plan: LogicalPlan, + offset: Int): (LogicalPlan, Boolean) = plan match { +case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + (operation, isPushed) +case p: Project => + val (newChild, isPushed) = pushDownOffset(p.child, offset) + (p.withNewChildren(Seq(newChild)), isPushed) +case other => (other, false) + } + + def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform { Review Comment: I think the code can be much easier to read if we push down limit and offset together. ``` def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform { case Limit... case Offset... case LimitAndOffset ... ... } ``` the code should be similar to the plann rule `SpecialLimits` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r880684191 ## sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala: ## @@ -141,15 +141,23 @@ case class RowDataSourceScanExec( handledFilters } -val topNOrLimitInfo = +val limitOrOffsetInfo = if (pushedDownOperators.limit.isDefined && pushedDownOperators.sortValues.nonEmpty) { -val pushedTopN = - s"ORDER BY ${seqToString(pushedDownOperators.sortValues.map(_.describe()))}" + - s" LIMIT ${pushedDownOperators.limit.get}" -Some("PushedTopN" -> pushedTopN) -} else { - pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value") -} +if (pushedDownOperators.offset.isDefined) { Review Comment: nit: ``` val topNStr = "ORDER BY ... LIMIT ..." if (pushedDownOperators.offset.isDefined) { $"topNStr OFFSET ..." } else { topNStr } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r879216237 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,8 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample -> filter -> aggregate -> limit -> column pruning. + * sample -> filter -> aggregate -> offset -> limit/top-n(sort + limit) -> Review Comment: We can switch the order of LIMIT and OFFSET to match this defined order, but we can't switch the order of ORDER BY and OFFSET. What if the SQL query is `ORDER BY ... LIMIT ... OFFSET ...`? Nothing can be pushed down? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r877141680 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { +val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset +} +if (removed) { + assert(offsets.isEmpty) +} else { + assert(offsets.nonEmpty) +} + } + + test("simple scan with OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") +checkAnswer(df3, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 12000, 1200, true))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .offset(1) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") +checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00))) + +val name = udf { (x: String) => x.matches("cat|dav|amy") } +val sub = udf { (x: String) => x.substring(0, 3) } +val df5 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .offset(1) +checkOffsetRemoved(df5, false) +// OFFSET is pushed down only if all the filters are pushed down +checkPushedInfo(df5, "PushedFilters: [], ") +checkAnswer(df5, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat"))) + } + + test("simple scan with LIMIT and OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, PushedOffset: OFFSET 1,") Review Comment: This does not match https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26 Assume that I have a local array data source. According to the API doc, Spark pushes down LIMIT first. For this query, I'll do `array.take(1).drop(1)`. This is wrong and doesn't match the query `df.limit(2).offset(1)`. we should either fix the API doc, or fix the pushdown logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r877141680 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { +val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset +} +if (removed) { + assert(offsets.isEmpty) +} else { + assert(offsets.nonEmpty) +} + } + + test("simple scan with OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") +checkAnswer(df3, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 12000, 1200, true))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .offset(1) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") +checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00))) + +val name = udf { (x: String) => x.matches("cat|dav|amy") } +val sub = udf { (x: String) => x.substring(0, 3) } +val df5 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .offset(1) +checkOffsetRemoved(df5, false) +// OFFSET is pushed down only if all the filters are pushed down +checkPushedInfo(df5, "PushedFilters: [], ") +checkAnswer(df5, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat"))) + } + + test("simple scan with LIMIT and OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, PushedOffset: OFFSET 1,") Review Comment: This does not match https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26 Assume that I have a local array data source. According to the API doc, Spark pushes down LIMIT first. For this query, I'll do `array.limit(1).drop(1)`. This is wrong and doesn't match the query `df.limit(2).offset(1)`. we should either fix the API doc, or fix the pushdown logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r874584082 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownLimit.java: ## @@ -21,8 +21,8 @@ /** * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to - * push down LIMIT. Please note that the combination of LIMIT with other operations - * such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is NOT pushed down. + * push down LIMIT. We can push down LIMIT with many other operations if they follow the + * operator order we defined in {@link ScanBuilder}'s class doc. Review Comment: let's open a backport PR for 3.3 to fix the classdoc later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r874589600 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,72 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset( Review Comment: let's have a single `pushDownLimitAndOffset`. It should be simpler to the planner rule, and we need to match different kind of query plans: LIMIT only, OFFSET only, LIMIT + OFFSET, OFFSET + LIMIT, SORT + LIMT, SORT + LIMIT + OFFSET, etc. ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,72 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset( Review Comment: let's have a single `pushDownLimitAndOffset`. It should be similar to the planner rule, and we need to match different kind of query plans: LIMIT only, OFFSET only, LIMIT + OFFSET, OFFSET + LIMIT, SORT + LIMT, SORT + LIMIT + OFFSET, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r874587459 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit pushDownFilters, pushDownAggregates, pushDownLimits, + pushDownOffsets, Review Comment: Or can we push them down in one step? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r874587041 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit pushDownFilters, pushDownAggregates, pushDownLimits, + pushDownOffsets, Review Comment: according to the doc, OFFSET should be pushed after LIMIT now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r874586232 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala: ## @@ -304,10 +307,11 @@ private[jdbc] class JDBCRDD( } val myLimitClause: String = dialect.getLimitClause(limit) +val myOffsetClause: String = dialect.getOffsetClause(offset) val sqlText = options.prepareQuery + s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" + - s" $myWhereClause $getGroupByClause $getOrderByClause $myLimitClause" + s" $myWhereClause $getGroupByClause $getOrderByClause $myLimitClause $myOffsetClause" Review Comment: We need to be careful here. The semantic of pushdown is LIMIT first, then OFFSET, while `LIMIT a OFFSET b` means OFFSET first in SQL spec. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873515998 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,30 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset(plan: LogicalPlan, offset: Int): (LogicalPlan, Boolean) = plan match { +case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + (operation, isPushed) +case p: Project => + val (newChild, isPushed) = pushDownOffset(p.child, offset) + (p.withNewChildren(Seq(newChild)), isPushed) +case other => (other, false) + } + + def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform { +// TODO supports push down Limit append Offset or Offset append Limit +case offset @ Offset(IntegerLiteral(n), child) => Review Comment: We can match offset, limit + offset and offset + limit, similar to the planner rule (after https://github.com/apache/spark/pull/36541) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample -> filter -> aggregate -> limit -> column pruning. + * sample -> filter -> aggregate -> offset -> limit or top N -> column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-N(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. And this order matches the physical plan more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample -> filter -> aggregate -> limit -> column pruning. + * sample -> filter -> aggregate -> offset -> limit or top N -> column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-N(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample -> filter -> aggregate -> limit -> column pruning. + * sample -> filter -> aggregate -> offset -> limit or top N -> column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-n(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873341127 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownOffset.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; + +/** + * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to + * push down OFFSET. Please note that the combination of OFFSET with other operations + * such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is NOT pushed down. Review Comment: BTW we need to update `ScanBuider`'s classdoc for new pushdown support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873340929 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownOffset.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; + +/** + * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to + * push down OFFSET. Please note that the combination of OFFSET with other operations + * such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is NOT pushed down. Review Comment: I understand that this is copied from other pushdown interfaces, but I find it really hard to follow. We can push down OFFSET with many other operators if they follow the operator order we defined in `ScanBuilder`'s class doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r870959719 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit pushDownFilters, pushDownAggregates, pushDownLimits, + pushDownOffsets, Review Comment: shall we push down offset before limit? The natural operators order in SQL is offset then limit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r870959171 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -2102,6 +2102,16 @@ class Dataset[T] private[sql]( Limit(Literal(n), logicalPlan) } + /** + * Returns a new Dataset by skipping the first `m` rows. + * + * @group typedrel + * @since 3.4.0 + */ + def offset(n: Int): Dataset[T] = withTypedPlan { Review Comment: please open a separate PR for it. This is adding a new user-facing API and needs more visibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r870958697 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala: ## @@ -433,13 +433,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr) - case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] Review Comment: can we have a separate PR for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org