beliefer commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r883245824


##########
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##########
@@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
     checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+    val offsets = df.queryExecution.optimizedPlan.collect {
+      case offset: Offset => offset
+    }
+    if (removed) {
+      assert(offsets.isEmpty)
+    } else {
+      assert(offsets.nonEmpty)
+    }
+  }
+
+  test("simple scan with OFFSET") {
+    val df1 = spark.read
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .offset(1)
+    checkOffsetRemoved(df1)
+    checkPushedInfo(df1,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+    checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df2 = spark.read
+      .option("pushDownOffset", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .offset(1)
+    checkOffsetRemoved(df2, false)
+    checkPushedInfo(df2,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+    checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df3 = spark.read
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .offset(1)
+    checkOffsetRemoved(df3, false)
+    checkPushedInfo(df3,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+    checkAnswer(df3, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df4 = spark.read
+      .option("partitionColumn", "dept")
+      .option("lowerBound", "0")
+      .option("upperBound", "2")
+      .option("numPartitions", "2")
+      .table("h2.test.employee")
+      .filter($"dept" > 1)
+      .offset(1)
+    checkOffsetRemoved(df4, false)
+    checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+    checkAnswer(df4, Seq(Row(2, "david", 10000, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+    val df5 = spark.read
+      .table("h2.test.employee")
+      .groupBy("DEPT").sum("SALARY")

Review Comment:
   In fact, limit have the same issue. I think we can support them in a single 
PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to