LantaoJin commented on a change in pull request #29021:
URL: https://github.com/apache/spark/pull/29021#discussion_r450666963
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -730,6 +713,67 @@ class AdaptiveQueryExecSuite
}
}
+ private def checkSkewJoin(
+ joins: Seq[SortMergeJoinExec],
+ leftSkewNum: Int,
+ rightSkewNum: Int): Unit = {
+ assert(joins.size == 1 && joins.head.isSkewJoin)
+ assert(joins.head.left.collect {
+ case r: CustomShuffleReaderExec => r
+ }.head.partitionSpecs.collect {
+ case p: PartialReducerPartitionSpec => p.reducerIndex
+ }.distinct.length == leftSkewNum)
+ assert(joins.head.right.collect {
+ case r: CustomShuffleReaderExec => r
+ }.head.partitionSpecs.collect {
+ case p: PartialReducerPartitionSpec => p.reducerIndex
+ }.distinct.length == rightSkewNum)
+ }
+
+ test("SPARK-32201: handle general skew join pattern") {
Review comment:
It's hard to build a test case like the case in PR description. So I
just want to give a simple one. But the rule is same. In this test case, we
build a SMJ like:
```
SMJ
Sort
CustomShuffleReader(coalesced)
Shuffle
Sort
HashAggregate
CustomShuffleReader(coalesced)
Shuffle
```
It has already breaked current matching pattern. The plan will be optimized
to
```
SMJ
Sort
CustomShuffleReader(skewed)
Shuffle
Sort
HashAggregate
CustomShuffleReader(coalesced)
Shuffle
```
We don't split the shuffle side with any AggregateExec.
The plan before patch is
```
AdaptiveSparkPlan(isFinalPlan=true)
+- *(5) SortMergeJoin [key1#183L], [key2#189L], Inner, false
:- *(3) Sort [key1#183L ASC NULLS FIRST], false, 0
: +- CustomShuffleReader coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(key1#183L, 100), true
: +- *(1) Project [CASE WHEN (id#181L < 250) THEN 249 WHEN
(id#181L >= 750) THEN 1000 ELSE id#181L END AS key1#183L, id#181L AS
value1#184L]
: +- *(1) Range (0, 1000, step=1, splits=10)
+- *(4) Sort [key2#189L ASC NULLS FIRST], false, 0
+- *(4) HashAggregate(keys=[key2#189L], functions=[sum(value2#190L)],
output=[key2#189L, sum2#193L])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(key2#189L, 100), true
+- *(2) HashAggregate(keys=[key2#189L],
functions=[partial_sum(value2#190L)], output=[key2#189L, sum#200L])
+- *(2) Project [CASE WHEN (id#187L < 250) THEN 249
ELSE id#187L END AS key2#189L, id#187L AS value2#190L]
+- *(2) Range (0, 1000, step=1, splits=10)
```
The plan after is
```
AdaptiveSparkPlan(isFinalPlan=true)
+- *(5) SortMergeJoin(skew=true) [key1#183L], [key2#189L], Inner, true
:- *(3) Sort [key1#183L ASC NULLS FIRST], false, 0
: +- CustomShuffleReader coalesced and skewed
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(key1#183L, 100), true
: +- *(1) Project [CASE WHEN (id#181L < 250) THEN 249 WHEN
(id#181L >= 750) THEN 1000 ELSE id#181L END AS key1#183L, id#181L AS
value1#184L]
: +- *(1) Range (0, 1000, step=1, splits=10)
+- *(4) Sort [key2#189L ASC NULLS FIRST], false, 0
+- *(4) HashAggregate(keys=[key2#189L], functions=[sum(value2#190L)],
output=[key2#189L, sum2#193L])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(key2#189L, 100), true
+- *(2) HashAggregate(keys=[key2#189L],
functions=[partial_sum(value2#190L)], output=[key2#189L, sum#200L])
+- *(2) Project [CASE WHEN (id#187L < 250) THEN 249
ELSE id#187L END AS key2#189L, id#187L AS value2#190L]
+- *(2) Range (0, 1000, step=1, splits=10)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]