LantaoJin commented on pull request #29021:
URL: https://github.com/apache/spark/pull/29021#issuecomment-655272794


   In the case I given in the description. If I add a LIMIT in the SQL
   ```
   SELECT * FROM
     skewData1 AS data1
     INNER JOIN
     (
       SELECT skewData2.key2, sum(skewData2.value2) AS sum2
       FROM skewData2 GROUP BY skewData2.key2
     ) AS data2
   ON data1.key1 = data2.key2 LIMIT 10
   ```
   The plan will be
   ```
   Limit
     SMJ
       Sort
         CustomShuffleReader(coalesced and skewed)
           Shuffle
       Sort
         HashAggregate
           CustomShuffleReader(coalesced)
             Shuffle
   ```
   The ensureRequirements.apply(optimizedPlan)
   
https://github.com/apache/spark/blob/65286aec4b3c4e93d8beac6dd1b097ce97d53fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala#L274
   will failed with `IllegalArgumentException: requirement failed: 
PartitioningCollection requires all of its partitionings have the same` in 
computing `ensureDistributionAndOrdering` for `SortMergeJoinExec` node.
   
   That's because for above plan, when transfomUp to the SMJ node in 
`ensureRequirements`, in the right side, it will add a `ShuffleExchangeExec` 
with `hashpartitioning` under `HashAggregateExec`, but won't add 
`ShuffleExchangeExec` in left side (skewed) since `CustomShuffleReader`'s 
outputPartitioning (UnknownPartitioning) satisfies the SMJ's distribution 
(UnspecifiedDistribution).
   Then, when transfomUp to the Limit node, `child.outputPartitioning`(child is 
SMJ node) will throw IllegalArgumentException in the code:
   
https://github.com/apache/spark/blob/65286aec4b3c4e93d8beac6dd1b097ce97d53fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L48
   
https://github.com/apache/spark/blob/65286aec4b3c4e93d8beac6dd1b097ce97d53fd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala#L305-L307
   
   That caused by the numPartitions of Hashpartitioning is not equals to the 
UnknownPartitioning after skewed optimization.
   So to get more general skew join pattern matching, below code maybe need to 
remove or change
   
https://github.com/apache/spark/blob/65286aec4b3c4e93d8beac6dd1b097ce97d53fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L89-L92
   Any ideal? @cloud-fan @JkSelf 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to