wangyum commented on code in PR #40688:
URL: https://github.com/apache/spark/pull/40688#discussion_r1163445279


##########
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala:
##########
@@ -1050,4 +1050,20 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
       }
     }
   }
+
+  test("SPARK-43021: Make coalesceBucketsInJoin effective enable AQE") {

Review Comment:
   Instead of adding new tests. Could we just update `bucket coalescing is 
applied when join expressions match with partitioning expressions` with:
   ```scala
     test("bucket coalescing is applied when join expressions match with 
partitioning expressions") {
       withTable("t1", "t2") {
         df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
         df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
   
         withSQLConf(
           SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
           SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
           def verify(
               query: String,
               expectedNumShuffles: Int,
               expectedCoalescedNumBuckets: Option[Int]): Unit = {
             Seq(true, false).foreach { aqeEnabled =>
               withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
aqeEnabled.toString) {
                 val plan = sql(query).queryExecution.executedPlan
                 val shuffles = collect(plan) { case s: ShuffleExchangeExec => 
s }
                 assert(shuffles.length == expectedNumShuffles)
   
                 val scans = collect(plan) {
                   case f: FileSourceScanExec if 
f.optionalNumCoalescedBuckets.isDefined => f
                 }
                 if (expectedCoalescedNumBuckets.isDefined) {
                   assert(scans.length == 1)
                   assert(scans.head.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
                 } else {
                   assert(scans.isEmpty)
                 }
               }
             }
           }
   
           // Coalescing applied since join expressions match with the bucket 
columns.
           verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j", 0, 
Some(4))
           // Coalescing applied when columns are aliased.
           verify(
             "SELECT * FROM t1 JOIN (SELECT i AS x, j AS y FROM t2) ON t1.i = x 
AND t1.j = y",
             0,
             Some(4))
           // Coalescing is not applied when join expressions do not match with 
bucket columns.
           verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None)
         }
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to