Github user jeanlyn commented on the pull request:
https://github.com/apache/spark/pull/6682#issuecomment-109777121
@yhuai ,Thanks for comment.In the current implementation of
`join(BinaryNode)` in master just simply use the one side partitioning as its
partitioning to judge whether need shuffle and ignore the other side
partitioning which already partition.This may cause unnecessary shuffle on
multiway join.For example:
```sql
table a(key string,value string)
table b(key string,value string)
table c(key string,value string)
table d(key string,value string)
table e(key string,value string)
select a.value,b.value,c.value,d.value,e.value from
a join b
on a.key = b.key
join c
on a.key = c.key
join d
on b.key = d.key
join e
on c.key = e.key
```
we got
```
Project [value#63,value#65,value#67,value#69,value#71]
ShuffledHashJoin [key#66], [key#70], BuildRight
Exchange (HashPartitioning [key#66], 200)
Project [value#63,key#66,value#67,value#65,value#69]
ShuffledHashJoin [key#64], [key#68], BuildRight
Exchange (HashPartitioning [key#64], 200)
Project [value#63,key#66,key#64,value#67,value#65]
ShuffledHashJoin [key#62], [key#66], BuildRight
ShuffledHashJoin [key#62], [key#64], BuildRight
Exchange (HashPartitioning [key#62], 200)
HiveTableScan [key#62,value#63], (MetastoreRelation default, a,
None), None
Exchange (HashPartitioning [key#64], 200)
HiveTableScan [key#64,value#65], (MetastoreRelation default, b,
None), None
Exchange (HashPartitioning [key#66], 200)
HiveTableScan [key#66,value#67], (MetastoreRelation default, c,
None), None
Exchange (HashPartitioning [key#68], 200)
HiveTableScan [key#68,value#69], (MetastoreRelation default, d, None),
```
But actually
we just need
```
Project [value#59,value#61,value#63,value#65,value#67]
ShuffledHashJoin [key#62], [key#66], BuildRight
Project [value#63,value#61,value#65,value#59,key#62]
ShuffledHashJoin [key#60], [key#64], BuildRight
Project [value#63,value#61,key#60,value#59,key#62]
ShuffledHashJoin [key#58], [key#62], BuildRight
ShuffledHashJoin [key#58], [key#60], BuildRight
Exchange (HashPartitioning 200)
HiveTableScan [key#58,value#59], (MetastoreRelation default, a,
None), None
Exchange (HashPartitioning 200)
HiveTableScan [key#60,value#61], (MetastoreRelation default, b,
None), None
Exchange (HashPartitioning 200)
HiveTableScan [key#62,value#63], (MetastoreRelation default, c,
None), None
Exchange (HashPartitioning 200)
HiveTableScan [key#64,value#65], (MetastoreRelation default, d, None),
None
Exchange (HashPartitioning 200)
HiveTableScan [key#66,value#67], (MetastoreRelation default, e, None),
None
```
This will greatly improve the efficiency,especially for the outer join. We
had some real world cases of multiway full outer join with the same key,it
produce a lot of null key(causing data skew,while hive doesn't) with redundancy
shuffle and ran OOM finally.
I want to try using the `meetPartitions`,we can save the both side
`outputPartitioning` of the `BinaryNode` and the itself
`outputPartitioning`(redundancy?) when constructing the plan tree to achieve
this easily,and the `meetPartitions` will be reset to the node
outputPartitioning when need shuffle to avoid removing the indeed `Exchange`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]