wangyum commented on code in PR #40688:
URL: https://github.com/apache/spark/pull/40688#discussion_r1163445279
##########
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala:
##########
@@ -1050,4 +1050,20 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
}
}
+
+ test("SPARK-43021: Make coalesceBucketsInJoin effective enable AQE") {
Review Comment:
Instead of adding new tests. Could we just update `bucket coalescing is
applied when join expressions match with partitioning expressions` with:
```scala
test("bucket coalescing is applied when join expressions match with
partitioning expressions") {
withTable("t1", "t2") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
def verify(
query: String,
expectedNumShuffles: Int,
expectedCoalescedNumBuckets: Option[Int]): Unit = {
Seq(true, false).foreach { aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key ->
aqeEnabled.toString) {
val plan = sql(query).queryExecution.executedPlan
val shuffles = collect(plan) { case s: ShuffleExchangeExec =>
s }
assert(shuffles.length == expectedNumShuffles)
val scans = collect(plan) {
case f: FileSourceScanExec if
f.optionalNumCoalescedBuckets.isDefined => f
}
if (expectedCoalescedNumBuckets.isDefined) {
assert(scans.length == 1)
assert(scans.head.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
} else {
assert(scans.isEmpty)
}
}
}
}
// Coalescing applied since join expressions match with the bucket
columns.
verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j", 0,
Some(4))
// Coalescing applied when columns are aliased.
verify(
"SELECT * FROM t1 JOIN (SELECT i AS x, j AS y FROM t2) ON t1.i = x
AND t1.j = y",
0,
Some(4))
// Coalescing is not applied when join expressions do not match with
bucket columns.
verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None)
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]