Thanks a lot for the detailed explanation, and your work on this awesome 
addition to spark :D

I just realized that with classical joins one has to repartition for each 
different join as well, so I assume calling partitionBy for each classical join
is equivalent to the `Exchange` operation.

All the best, Jan

On 03.04.2014, at 23:19, Michael Armbrust <[email protected]> wrote:

> Ah, I understand now.  It is going to depend on the input partitioning (which 
> is a little different than a partitioner in Spark core, since in Spark SQL we 
> have more information about the logical structure).  Basically all SQL 
> operators have a method `requiredChildDistribution` and an 
> `outputPartitioning`.  For a hash join, the required distribution makes sure 
> that all tuples with the same values of the join keys end up on the same 
> machine.  When this is not the case, the optimizer will insert an `Exchange` 
> operator that will shuffle the data correctly, introducing a wide dependency. 
>  Right now we are pretty liberal with the insertion of Exchange operators 
> (reasoning here being that it is better to always be correct).  Basically, if 
> one side doesn't match, we insert exchange operators for both.
> 
> So unfortunately for the 1.0 release it is likely that these will almost 
> always be wide dependencies.  This is something we hope to improve in future 
> release though.
> 
> 
> 
> On Thu, Apr 3, 2014 at 1:26 PM, Jan-Paul Bultmann <[email protected]> 
> wrote:
> I was referring to the categorization made by the RDD paper.
> It describes a narrow dependency as one where every parent partition is 
> required by at most one child partition (e.g. map),
> whereas a wide dependency means that some parent partitions are required by 
> multiple child partitions (e.g. a join without hash partitioning).
> The later is significantly more expensive in the case of failure, as an 
> entire lost RDD
> has to be recomputed, just to restore a single partition.
> 
> Here is a screenshot of the relevant diagram from the paper :D 
> http://cl.ly/image/3w2b1s2k2H1g
> 
> I’m currently building a Datalog system on top of Spark which means hundreds 
> of join iterations, so having fast recovery would be very nice^^.
> 
> Thanks :), Jan
> 
> On 03.04.2014, at 22:10, Michael Armbrust <[email protected]> wrote:
> 
>> I'm sorry, but I don't really understand what you mean when you say "wide" 
>> in this context.  For a HashJoin, the only dependencies of the produced RDD 
>> are the two input RDDs.  For BroadcastNestedLoopJoin  The only dependence 
>> will be on the streamed RDD.  The other RDD will be distributed to all nodes 
>> using a Broadcast variable.
>> 
>> Michael
>> 
>> 
>> On Thu, Apr 3, 2014 at 12:59 PM, Jan-Paul Bultmann <[email protected]> 
>> wrote:
>> Hey,
>> Does somebody know the kinds of dependencies that the new SQL operators 
>> produce?
>> I’m specifically interested in the relational join operation as it seems 
>> substantially more optimized.
>> 
>> The old join was narrow on two RDDs with the same partitioner.
>> Is the relational join narrow as well?
>> 
>> Cheers Jan
>> 
> 
> 

Reply via email to