cloud-fan commented on a change in pull request #34451:
URL: https://github.com/apache/spark/pull/34451#discussion_r752838292



##########
File path: 
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -284,4 +289,109 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       testIndexUsingSQL(s"$catalogName.new_table")
     }
   }
+
+  def supportsTableSample: Boolean = false
+
+  private def samplePushed(df: DataFrame): Boolean = {
+    val sample = df.queryExecution.optimizedPlan.collect {
+      case s: Sample => s
+    }
+    sample.isEmpty
+  }
+
+  private def filterPushed(df: DataFrame): Boolean = {
+    val filter = df.queryExecution.optimizedPlan.collect {
+      case f: Filter => f
+    }
+    filter.isEmpty
+  }
+
+  private def limitPushed(df: DataFrame, limit: Int): Boolean = {
+    val filter = df.queryExecution.optimizedPlan.collect {
+      case relation: DataSourceV2ScanRelation => relation.scan match {
+        case v1: V1ScanWrapper =>
+          return v1.pushedDownOperators.limit == Some(limit)
+      }
+    }
+    false
+  }
+
+  private def columnPruned(df: DataFrame, col: String): Boolean = {
+    val scan = df.queryExecution.optimizedPlan.collectFirst {
+      case s: DataSourceV2ScanRelation => s
+    }.get
+    scan.schema.names.sameElements(Seq(col))
+  }
+
+  test("SPARK-37038: Test TABLESAMPLE") {
+    if (supportsTableSample) {
+      withTable(s"$catalogName.new_table") {
+        sql(s"CREATE TABLE $catalogName.new_table (col1 INT, col2 INT)")
+        spark.range(10).select($"id" * 2, $"id" * 2 + 
1).write.insertInto(s"$catalogName.new_table")
+
+        // sample push down + column pruning
+        val df1 = sql(s"SELECT col1 FROM $catalogName.new_table TABLESAMPLE 
(BUCKET 6 OUT OF 10)" +
+          " REPEATABLE (12345)")
+        assert(samplePushed(df1))
+        assert(columnPruned(df1, "col1"))
+        assert(df1.collect().length < 10)
+
+        // sample push down only
+        val df2 = sql(s"SELECT * FROM $catalogName.new_table TABLESAMPLE (50 
PERCENT)" +
+          " REPEATABLE (12345)")
+        assert(samplePushed(df2))
+        assert(df2.collect().length < 10)
+
+        // sample(BUCKET ... OUT OF) push down + limit push down + column 
pruning
+        val df3 = sql(s"SELECT col1 FROM $catalogName.new_table TABLESAMPLE 
(BUCKET 6 OUT OF 10)" +
+          " LIMIT 2")
+        assert(samplePushed(df3))
+        assert(limitPushed(df3, 2))
+        assert(columnPruned(df3, "col1"))
+        assert(df3.collect().length == 2)

Review comment:
       another way is to always specify the seed in table sample tests.
   
   @huaxingao can you help fix this? I don't have a strong opinion on which 
approach is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to