Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/5074#discussion_r32766630
--- Diff: docs/programming-guide.md ---
@@ -1090,6 +1090,67 @@ for details.
</tr>
</table>
+### Shuffle operations
+
+Certain operations within Spark trigger an event known as the shuffle. The
shuffle is Spark's
+mechanism for re-distributing data so that is grouped differently across
partitions. This typically
+involves copying data across executors and machines, making the shuffle a
complex and
+costly operation.
+
+#### Background
+
+To understand what happens during the shuffle we can consider the example
of the
+[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation
generates a new RDD where all
+values for a single key are combined into a tuple - the key and the result
of executing a reduce
+function against all values associated with that key. The challenge is
that not all values for a
+single key necessarily reside on the same partition, or even the same
machine, but they must be
+co-located to compute the result.
+
+In Spark, data is generally not distributed across partitions to be in the
necessary place for a
+specific operation. During computations, a single task will operate on a
single partition - thus, to
+organize all the data for a single `reduceByKey` reduce task to execute,
Spark needs to perform an
+all-to-all operation. It must read from all partitions to find all the
values for all keys, and then
+organize those such that all values for any key lie within the same
partition - this is called the
+**shuffle**.
+
+Although the set of elements in each partition of newly shuffled data will
be deterministic, and so
+is the ordering of partitions themselves, the ordering of these elements
is not. If one desires predictably
+ordered data following shuffle then it's possible to use:
+
+* `mapPartitions` to sort each partition using, for example, `.sorted`
+* `repartitionAndSortWithinPartitions` to efficiently sort partitions
while simultaneously repartitioning
+* `sortBy` to make a globally ordered RDD
+
+Operations which can cause a shuffle include **repartition** operations
like
+[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink),
**'ByKey** operations
+(except for counting) like [`groupByKey`](#GroupByLink) and
[`reduceByKey`](#ReduceByLink), and
+**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
+
+#### Performance Impact
+The **Shuffle** is an expensive operation since it involves disk I/O, data
serialization, and
+network I/O. To organize data for the shuffle, Spark generates sets of
tasks - *map* tasks to
+organize the data, and a set of *reduce* tasks to aggregate it. This
nomenclature comes from
+MapReduce and does not directly relate to Spark's `map` and `reduce`
operations.
+
+Internally, results from individual map tasks are kept in memory until
they can't fit. Then, these
+are sorted based on the target partition and written to a single file. On
the reduce side, tasks
+read the relevant sorted blocks.
+
+Certain shuffle operations can consume significant amounts of heap memory
since they employ
+in-memory data structures to organize records before or after transferring
them. Specifically,
+`reduceByKey` and `aggregateByKey` create these structures on the map side
and `'ByKey` operations
+generate these on the reduce side. When data does not fit in memory Spark
will spill these tables
+to disk, incurring the additional overhead of disk I/O and increased
garbage collection.
+
+Shuffle also generates a large number of intermediate files on disk. As of
Spark 1.3, these files
+are not cleaned up from Spark's temporary storage until Spark is stopped,
which means that
+long-running Spark jobs may consume available disk space. This is done so
the shuffle doesn't need
--- End diff --
That clarifies things for me. I know that there has been some concern about
the shuffle files filling up disk, but that as of now can happen because
one or more of the following reasons.
1. GC does not kick in for a long time (very high driver memory). The
solution may often be periodically call GC.
2. Nothing goes out of scope and so nothing is GCed.
3. There are some issues reported with shuffle files not being cleaned up
in Mesos
The 3rd one is a bug and we will fix it. The first two should be clarified
in the docs. That is better than than this current very scary description.
On Wed, Jun 17, 2015 at 11:41 PM, Sean Owen <[email protected]>
wrote:
> In docs/programming-guide.md
> <https://github.com/apache/spark/pull/5074#discussion_r32702504>:
>
> > +organize the data, and a set of *reduce* tasks to aggregate it. This
nomenclature comes from
> > +MapReduce and does not directly relate to Spark's `map` and `reduce`
operations.
> > +
> > +Internally, results from individual map tasks are kept in memory until
they can't fit. Then, these
> > +are sorted based on the target partition and written to a single file.
On the reduce side, tasks
> > +read the relevant sorted blocks.
> > +
> > +Certain shuffle operations can consume significant amounts of heap
memory since they employ
> > +in-memory data structures to organize records before or after
transferring them. Specifically,
> > +`reduceByKey` and `aggregateByKey` create these structures on the map
side and `'ByKey` operations
> > +generate these on the reduce side. When data does not fit in memory
Spark will spill these tables
> > +to disk, incurring the additional overhead of disk I/O and increased
garbage collection.
> > +
> > +Shuffle also generates a large number of intermediate files on disk.
As of Spark 1.3, these files
> > +are not cleaned up from Spark's temporary storage until Spark is
stopped, which means that
> > +long-running Spark jobs may consume available disk space. This is done
so the shuffle doesn't need
>
> @tdas <https://github.com/tdas> This bit is a response to
> https://issues.apache.org/jira/browse/SPARK-5836 You would likely know
> better, and if GC of shuffle-related objects should trigger cleanup of
> shuffle files, then phew, at least there is *some* mechanism for that. I
> know people occasionally ask about problems with too many shuffle files
> lying around, but that doesn't mean the mechanism doesn't work. I think
the
> best short-term change is just to update this statement if you're pretty
> confident this mechanism works.
>
> â
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/5074/files#r32702504>.
>
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]