GitHub user concretevitamin opened a pull request:

    https://github.com/apache/spark/pull/1163

    [WIP] [SQL] SPARK-1800 Add broadcast hash join operator, and simple 
size-based auto conversion optimization.

    This PR is based off Michael's [PR 
734](https://github.com/apache/spark/pull/734) and adds a prototype 
optimization that detects small tables involved in joins so that they are 
broadcasted.
    
    The following is a sample local hive/console session showing that the 
optimization works on a simple case. In it, `points.parquet` and 
`points2.parquet` are identical, and are generated from `data/kmeans_data.txt` 
(I read the file into Spark, defined a simple case class, saved to Parquet 
using Spark SQL).
    
    ```
    scala> val points = parquetFile("points.parquet")
    scala> val points2 = parquetFile("points2.parquet")
    scala> points.registerAsTable("points")
    scala> points2.registerAsTable("points2")
    scala> hql("SELECT p.x, p.y FROM points p JOIN points2 p2 ON p.z = p2.z")
    res3: org.apache.spark.sql.SchemaRDD =
    SchemaRDD[2] at RDD at SchemaRDD.scala:100
    == Query Plan ==
    Project [x#0:0,y#1:1]
     BroadcastHashJoin [z#2], [z#5], BuildRight
      ParquetTableScan [x#0,y#1,z#2], (ParquetRelation points.parquet), []
      ParquetTableScan [z#5], (ParquetRelation points2.parquet), []
    
    scala> hql("SET spark.sql.auto.convert.join.size=10")
    scala> hql("SELECT p.x, p.y FROM points p JOIN points2 p2 ON p.z = p2.z")
    res6: org.apache.spark.sql.SchemaRDD =
    SchemaRDD[6] at RDD at SchemaRDD.scala:100
    == Query Plan ==
    Project [x#0:0,y#1:1]
     ShuffledHashJoin [z#2], [z#5], BuildRight
      Exchange (HashPartitioning [z#2:2], 200)
       ParquetTableScan [x#0,y#1,z#2], (ParquetRelation points.parquet), []
      Exchange (HashPartitioning [z#5:0], 200)
       ParquetTableScan [z#5], (ParquetRelation points2.parquet), []
    ```
    
    Open issues:
    - [ ] Figure out how to unit test the strategy.
    - [ ] Check if this optimization is applicable to more cases than the one 
shown above.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/concretevitamin/spark auto-broadcast-hash-join

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1163.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1163
    
----
commit a8420ca0c4cbc5988607d0cd235ffeb2cb51d052
Author: Michael Armbrust <[email protected]>
Date:   2014-05-11T18:23:02Z

    Copy records in executeCollect to avoid issues with mutable rows.

commit cf6b3818fbe7d1908bcbdc7f18c5773c01d05541
Author: Michael Armbrust <[email protected]>
Date:   2014-05-11T18:30:56Z

    Split out generic logic for hash joins and create two concrete physical 
operators: BroadcastHashJoin and ShuffledHashJoin.

commit 76ca4341036b95f71763f631049fdae033990ab5
Author: Michael Armbrust <[email protected]>
Date:   2014-05-11T18:31:20Z

    A simple strategy that broadcasts tables only when they are found in a 
configuration hint.

commit a92ed0cfbc9fbfada933f702e3879e70cb95283c
Author: Michael Armbrust <[email protected]>
Date:   2014-05-12T22:09:39Z

    Formatting.

commit 3e5d77cf80c5ec6349ccc2d3a12990acf4a692be
Author: Zongheng Yang <[email protected]>
Date:   2014-06-18T22:53:46Z

    WIP: giant and messy WIP.

commit 0ad122f34f43511bb408b02a91160cb86666cb3c
Author: Zongheng Yang <[email protected]>
Date:   2014-06-18T23:01:53Z

    Merge branch 'master' into auto-broadcast-hash-join
    
    Conflicts:
        
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
        sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala

commit 7c7158bf21ee9500a871e1d8fd770ec77c5177bb
Author: Zongheng Yang <[email protected]>
Date:   2014-06-20T22:31:53Z

    Prototype of auto conversion to broadcast hash join.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to