beliefer commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r878954574
########## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ########## @@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { + val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset + } + if (removed) { + assert(offsets.isEmpty) + } else { + assert(offsets.nonEmpty) + } + } + + test("simple scan with OFFSET") { + val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) + checkOffsetRemoved(df1) + checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") + checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + + val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) + checkOffsetRemoved(df2, false) + checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") + checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + + val df3 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) + checkOffsetRemoved(df3, false) + checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") + checkAnswer(df3, Seq(Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true))) + + val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .offset(1) + checkOffsetRemoved(df4, false) + checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") + checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00))) + + val name = udf { (x: String) => x.matches("cat|dav|amy") } + val sub = udf { (x: String) => x.substring(0, 3) } + val df5 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .offset(1) + checkOffsetRemoved(df5, false) + // OFFSET is pushed down only if all the filters are pushed down + checkPushedInfo(df5, "PushedFilters: [], ") + checkAnswer(df5, Seq(Row(10000.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat"))) + } + + test("simple scan with LIMIT and OFFSET") { + val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) + checkLimitRemoved(df1) + checkOffsetRemoved(df1) + checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, PushedOffset: OFFSET 1,") Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org