cloud-fan commented on a change in pull request #34451:
URL: https://github.com/apache/spark/pull/34451#discussion_r752837925
##########
File path:
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -284,4 +289,109 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
testIndexUsingSQL(s"$catalogName.new_table")
}
}
+
+ def supportsTableSample: Boolean = false
+
+ private def samplePushed(df: DataFrame): Boolean = {
+ val sample = df.queryExecution.optimizedPlan.collect {
+ case s: Sample => s
+ }
+ sample.isEmpty
+ }
+
+ private def filterPushed(df: DataFrame): Boolean = {
+ val filter = df.queryExecution.optimizedPlan.collect {
+ case f: Filter => f
+ }
+ filter.isEmpty
+ }
+
+ private def limitPushed(df: DataFrame, limit: Int): Boolean = {
+ val filter = df.queryExecution.optimizedPlan.collect {
+ case relation: DataSourceV2ScanRelation => relation.scan match {
+ case v1: V1ScanWrapper =>
+ return v1.pushedDownOperators.limit == Some(limit)
+ }
+ }
+ false
+ }
+
+ private def columnPruned(df: DataFrame, col: String): Boolean = {
+ val scan = df.queryExecution.optimizedPlan.collectFirst {
+ case s: DataSourceV2ScanRelation => s
+ }.get
+ scan.schema.names.sameElements(Seq(col))
+ }
+
+ test("SPARK-37038: Test TABLESAMPLE") {
+ if (supportsTableSample) {
+ withTable(s"$catalogName.new_table") {
+ sql(s"CREATE TABLE $catalogName.new_table (col1 INT, col2 INT)")
+ spark.range(10).select($"id" * 2, $"id" * 2 +
1).write.insertInto(s"$catalogName.new_table")
+
+ // sample push down + column pruning
+ val df1 = sql(s"SELECT col1 FROM $catalogName.new_table TABLESAMPLE
(BUCKET 6 OUT OF 10)" +
+ " REPEATABLE (12345)")
+ assert(samplePushed(df1))
+ assert(columnPruned(df1, "col1"))
+ assert(df1.collect().length < 10)
+
+ // sample push down only
+ val df2 = sql(s"SELECT * FROM $catalogName.new_table TABLESAMPLE (50
PERCENT)" +
+ " REPEATABLE (12345)")
+ assert(samplePushed(df2))
+ assert(df2.collect().length < 10)
+
+ // sample(BUCKET ... OUT OF) push down + limit push down + column
pruning
+ val df3 = sql(s"SELECT col1 FROM $catalogName.new_table TABLESAMPLE
(BUCKET 6 OUT OF 10)" +
+ " LIMIT 2")
+ assert(samplePushed(df3))
+ assert(limitPushed(df3, 2))
+ assert(columnPruned(df3, "col1"))
+ assert(df3.collect().length == 2)
Review comment:
I think should be <=2, as the TABLESAMPLE is not repeatable and may only
produce one row.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]