Regarding your initial question, I am not certain exactly what you mean by
the properties you mention. Here are some details about the hash join that
at least use those words, but perhaps not as you intended them.

The hash join is blocking in the sense that we will aggregate all inputs to
a partition before returning any results. In
CoGroupedRDD<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala#L147>,
we explicitly iterate over all input key-value pairs to build the map
before returning anything. The hash join is non-blocking in the sense that
we *will* stream the results of a particular partition up to any remaining
narrow dependencies (i.e, dependencies that don't require a shuffle) before
moving on to processing the next partition. Thus, the degree to which we
pipeline the hash join is completely dependent on our Partitioner, which is
user-customizable. This is simply a property of the Spark staged execution
model (good resources: video <https://www.youtube.com/watch?v=49Hr5xZyTEA>or
slides <http://files.meetup.com/3138542/dev-meetup-dec-2012.pptx>).

In Sparkland, the join method is generalized such that  the user can
specify a Partitioner, such as a RangePartitioner or
HashPartitioner<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L71>.
No matter what, the result is effectively a hash join, as each resulting
partition results in a bucket, and we join buckets in the same partition
using a hash map. To make this very concrete, a HashPartitioner takes a
desired number of partitions and simply buckets every input value into a
partition via hashing. Dead simple. Once we have 2 RDDs partitioned in the
same manner, we take 2 correlated partitions and throw them both into a
hash map to aggregate all the values from both that share the same keys.

There is some special logic to ensure that if one RDD is already
partitioned correctly, it will not be re-partitioned (or it won't be
shuffled if it doesn't have to be). This is the difference between
NarrowCoGroupSplitDep
and ShuffleCoGroupSplitDep. Each dependency (input RDD) of the cogroup is
independent; any subset can be already-partitioned or not, and the least
amount of work necessary to get them into order will be done.


On Fri, Jan 24, 2014 at 12:31 AM, rose <[email protected]> wrote:

> Hi all,
>
> I want to know more about join operation in spark. I know it uses hash
> join,
> but I am not able to figure out the  nature of the implementation such as
> blocking, non blocking, or shared , not shared partitions.
>
> If anyone knows, please reply to this post along with the linkers of the
> implementation in the spark source files.
>
> Thanks,
> rose
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Hash-Join-in-Spark-tp873.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to