MaxGekk commented on code in PR #35715:
URL: https://github.com/apache/spark/pull/35715#discussion_r892247619
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -683,6 +683,41 @@ class AdaptiveQueryExecSuite
}
}
}
+ test("SPARK-37753: Allow changing outer join to broadcast join even if too
many empty" +
+ " partitions on broadcast side") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
+ // `testData` is small enough to be broadcast but has empty partition
ratio over the config.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+ val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT * FROM (select * from testData where value = '1') td" +
+ " right outer join testData2 ON key = a")
+ val smj = findTopLevelSortMergeJoin(plan)
+ assert(smj.size == 1)
+ val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+ assert(bhj.size == 1)
+ }
+ }
+ }
+
+ test("SPARK-37753: Inhibit broadcast in left outer join when there are many
empty" +
Review Comment:
The test is flaky. It fails sometimes, see
https://github.com/apache/spark/runs/6788261561?check_suite_focus=true:
```
[info] - SPARK-37753: Inhibit broadcast in left outer join when there are
many empty partitions on outer/left side *** FAILED *** (230 milliseconds)
[info] ArrayBuffer(BroadcastHashJoin [key#116713], [a#116723], LeftOuter,
BuildRight, false
[info] :- AQEShuffleRead local
[info] : +- ShuffleQueryStage 0
[info] : +- Exchange hashpartitioning(key#116713, 5),
ENSURE_REQUIREMENTS, [id=#264946]
[info] : +- *(1) Filter (isnotnull(value#116714) AND (value#116714
= 1))
[info] : +- *(1) SerializeFromObject
[knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#116713,
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false,
true) AS value#116714]
[info] : +- Scan[obj#116712]
[info] +- BroadcastQueryStage 2
[info] +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false),
[id=#264986]
[info] +- AQEShuffleRead local
[info] +- ShuffleQueryStage 1
[info] +- Exchange hashpartitioning(a#116723, 5),
ENSURE_REQUIREMENTS, [id=#264965]
[info] +- *(2) SerializeFromObject
[knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#116723,
knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#116724]
[info] +- Scan[obj#116722]
[info] ) was not empty (AdaptiveQueryExecSuite.scala:718)
[info] org.scalatest.exceptions.TestFailedException:
[info] at
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info] at
org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$new$59(AdaptiveQueryExecSuite.scala:718)
[info] at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
[info] at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]