LantaoJin edited a comment on pull request #29021:
URL: https://github.com/apache/spark/pull/29021#issuecomment-655272794
In the case I given in the description. If I add a LIMIT in the SQL
```
SELECT * FROM
skewData1 AS data1
INNER JOIN
(
SELECT skewData2.key2, sum(skewData2.value2) AS sum2
FROM skewData2 GROUP BY skewData2.key2
) AS data2
ON data1.key1 = data2.key2 LIMIT 10
```
The plan will be
```
Limit
SMJ
Sort
CustomShuffleReader(coalesced and skewed)
Shuffle
Sort
HashAggregate
CustomShuffleReader(coalesced)
Shuffle
```
The ensureRequirements.apply(optimizedPlan)
https://github.com/apache/spark/blob/65286aec4b3c4e93d8beac6dd1b097ce97d53fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala#L274
will failed with `IllegalArgumentException: requirement failed:
PartitioningCollection requires all of its partitionings have the same` in
computing `ensureDistributionAndOrdering` for `SortMergeJoinExec` node.
That's because for above plan, when transfomUp to the SMJ node in
`ensureRequirements`, in the right side, it will add a `ShuffleExchangeExec`
with `hashpartitioning` under `HashAggregateExec`, but won't add
`ShuffleExchangeExec` in left side (skewed) since `CustomShuffleReader`'s
outputPartitioning (UnknownPartitioning) satisfies the SMJ's distribution
(UnspecifiedDistribution).
Then, when transfomUp to the Limit node, `child.outputPartitioning`(child is
SMJ node) will throw IllegalArgumentException in the code:
https://github.com/apache/spark/blob/65286aec4b3c4e93d8beac6dd1b097ce97d53fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L48
https://github.com/apache/spark/blob/65286aec4b3c4e93d8beac6dd1b097ce97d53fd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala#L305-L307
That caused by the numPartitions of Hashpartitioning is not equals to the
UnknownPartitioning after skewed optimization.
So to get more general skew join pattern matching, below code maybe need to
remove or change
https://github.com/apache/spark/blob/65286aec4b3c4e93d8beac6dd1b097ce97d53fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L89-L92
The plan looks like this:
```
CollectLimit 10
+- SortMergeJoin(skew=true) [key1#220L], [key2#226L], Inner
:- Sort [key1#220L ASC NULLS FIRST], false, 0
: +- CustomShuffleReader coalesced and skewed <------- number of
partitions changed
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(key1#220L, 100), true, [id=#120]
: +- *(1) Project [CASE WHEN (id#218L < 250) THEN 249 WHEN
(id#218L >= 750) THEN 1000 ELSE id#218L END AS key1#220L, id#218L AS
value1#221L]
: +- *(1) Range (0, 1000, step=1, splits=10)
+- Sort [key2#226L ASC NULLS FIRST], false, 0
+- HashAggregate(keys=[key2#226L], functions=[sum(value2#227L)],
output=[key2#226L, sum2#230L])
+- Exchange hashpartitioning(key2#226L, 100), true, [id=#185] <-------
number of partitions is 100, added in ensureRequirements
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(key2#226L, 100), true, [id=#131]
+- *(2) HashAggregate(keys=[key2#226L],
functions=[partial_sum(value2#227L)], output=[key2#226L, sum#237L])
+- *(2) Project [CASE WHEN (id#224L < 250) THEN 249
ELSE id#224L END AS key2#226L, id#224L AS value2#227L]
+- *(2) Range (0, 1000, step=1, splits=10)
```
Any ideal? @cloud-fan @JkSelf
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]