Operations that use keys
to combine multiple RDDs might generate a shuffle dependency. A shuffle
dependency always stores data to disk. If you aren't paying careful
attention to partitioners, then you should probably assume as a default
that the data *will* be written to disk since you're not really sure
(though it isn't hard to check in your code -- you can inspect
RDD.getDependencies).
What I was really trying to point out is that operations and
dependencies are different and that difference can be significant. An
operation can generate multiple dependencies between RDDs, sometimes a
"shuffle operation" like join() may have no shuffle dependencies, and
the dependencies aren't always the same type (a join() could result in 1
narrow dependency, 1 shuffle).
As for "more often in memory", that might only be true if you're
careful. Having a default partitioner probably increases the likelihood
significantly since that makes more RDDs end up with the same
partitioner. But many operations remove the partitioner, e.g. even a
union(), which will then force shuffling.
-Ewen
Thanks Patrick
and Ewen,
Great answers.
So a
shuffle dependency that can cause a shuffle will store the data in
memory + disk. More often in memory. Is my understanding
correct ?
Regards, SB
The difference between a
shuffle dependency and a transformation that can cause a shuffle is
probably worth pointing out.
The mentioned transformations (groupByKey, join, etc) *might* generate a
shuffle dependency on input RDDs, but they won't necessarily. For
example, if you join() two RDDs that already use the same partitioner
(e.g. a default HashPartitioner with the default parallelism), then no
shuffle needs to be performed (and nothing should hit disk). Any records
that need to be considered together will already be in the same
partitions of the input RDDs (e.g. all records with key X are guaranteed
to be in partition hash(X) of both input RDDs, so no shuffling is
needed).
Sometimes this is *really* worth exploiting, and even if it only applies
to one of the input RDDs. For example, if you're joining 2 RDDs and one
is much larger than the other and already partitioned, you can
explicitly use the partitioner from the larger RDD so that only the
smaller RDD gets shuffled.
This also means you probably want to pay attention to transformations
that remove partitioners. For example, prefer mapValues() to map().
mapValues() has to maintain the same key, so the output is guaranteed to
still be partitioned. map() can change the keys, so partitioning is
lost even if you keep the same key.
-Ewen
January 16, 2014
12:16 PM
The intermediate shuffle
output gets written to disk, but it often hits the OS-buffer cache
since it's not explicitly fsync'ed, so in many cases it stays
entirely in memory. The behavior of the shuffle is agnostic to
whether the base RDD is in cache or in disk.
For on-disk RDD's or
inputs, the shuffle path still has some key differences with
Hadoop's implementation, including that it doesn't sort on the map
side before shuffling.
- Patrick
Hi,
Is
this behavior the same when the data is in memory ? If the
data is stored to disk, then how is it different than Hadoop map reduce ?
Regards,
SB
For any shuffle
operation, groupByKey, etc. it does write map output to disk before
performing the reduce task on the data.
|