Operations that use keys to combine multiple RDDs might generate a shuffle dependency. A shuffle dependency always stores data to disk. If you aren't paying careful attention to partitioners, then you should probably assume as a default that the data *will* be written to disk since you're not really sure (though it isn't hard to check in your code -- you can inspect RDD.getDependencies).

What I was really trying to point out is that operations and dependencies are different and that difference can be significant. An operation can generate multiple dependencies between RDDs, sometimes a "shuffle operation" like join() may have no shuffle dependencies, and the dependencies aren't always the same type (a join() could result in 1 narrow dependency, 1 shuffle).

As for "more often in memory", that might only be true if you're careful. Having a default partitioner probably increases the likelihood significantly since that makes more RDDs end up with the same partitioner. But many operations remove the partitioner, e.g. even a union(), which will then force shuffling.

-Ewen
January 16, 2014 1:22 PM
Thanks Patrick and Ewen,

Great answers.

So a shuffle dependency that can cause a shuffle will store the data in memory + disk. More often in memory. 
Is my understanding correct ?

Regards,
SB



January 16, 2014 1:08 PM
The difference between a shuffle dependency and a transformation that can cause a shuffle is probably worth pointing out.

The mentioned transformations (groupByKey, join, etc) *might* generate a shuffle dependency on input RDDs, but they won't necessarily. For example, if you join() two RDDs that already use the same partitioner (e.g. a default HashPartitioner with the default parallelism), then no shuffle needs to be performed (and nothing should hit disk). Any records that need to be considered together will already be in the same partitions of the input RDDs (e.g. all records with key X are guaranteed to be in partition hash(X) of both input RDDs, so no shuffling is needed).

Sometimes this is *really* worth exploiting, and even if it only applies to one of the input RDDs. For example, if you're joining 2 RDDs and one is much larger than the other and already partitioned, you can explicitly use the partitioner from the larger RDD so that only the smaller RDD gets shuffled.

This also means you probably want to pay attention to transformations that remove partitioners. For example, prefer mapValues() to map(). mapValues() has to maintain the same key, so the output is guaranteed to still be partitioned. map() can change the keys, so partitioning is lost even if you keep the same key.

-Ewen

January 16, 2014 12:16 PM
The intermediate shuffle output gets written to disk, but it often
hits the OS-buffer cache since it's not explicitly fsync'ed, so in
many cases it stays entirely in memory. The behavior of the shuffle is
agnostic to whether the base RDD is in cache or in disk.

For on-disk RDD's or inputs, the shuffle path still has some key
differences with Hadoop's implementation, including that it doesn't
sort on the map side before shuffling.

- Patrick
January 16, 2014 6:24 AM
Hi,

Is this behavior the same when the data is in memory ? 
If the data is stored to disk, then how is it different than Hadoop map reduce ?

Regards,
SB



January 16, 2014 3:41 AM
For any shuffle operation, groupByKey, etc. it does write map output to disk before performing the reduce task on the data.



Reply via email to