cloud-fan commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-961913710
Before looking into the code details, I have two high-level questions: **How to define compatible partitions?** I had a quick look at the code, I think it's a good idea that we include the distribution to define compatible partitions (`ShuffleSpec` is literally partitioning + distribution). But we still need to describe the details. What's in my mind is: 1. The distributions should be compatible. If a plan sets totally different distribution for its children, I don't think it's reasonable to require the children to be co-partitioned (think of broadcast join). Compatible distribution can be defined as: the same distribution type, the same number of keys. 2. The number of partitions should be the same. 3. The partition key indexes w.r.t. its distribution should be the same. **How to pick the incompatible children to re-shuffle?** Today we always re-shuffle the join side with fewer num partitions, to get better parallelism. But now we have one more dimension: num keys. For example, `... FROM t1 JOIN t2 ON t1.a = t2.a AND t1.b = t2.b`, let's say `t1` is `HashPartitioning(a, 10)`, `t2` is `HashPartitioning(a, b, 5)`, which one we should re-shuffle? To be conservative we can still prefer parallelism, and the algorithm can be: 1. sort the children by num partitions and num keys 2. take the first child, and get its num partitions and key indexes 3. ask other children to shuffle by the given num partitions and key indexes, w.r.t. its distribution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
