Github user sryza commented on a diff in the pull request:
https://github.com/apache/spark/pull/5074#discussion_r26791494
--- Diff: docs/programming-guide.md ---
@@ -1086,6 +1086,29 @@ for details.
</tr>
</table>
+### Shuffle operations
+
+Certain operations within Spark trigger an operation known as the shuffle.
The shuffle is Spark's mechanism for re-distributing data so that is grouped
differently across partitions. This typically involves re-arranging and copying
data across executors and machines, making shuffle a complex and costly
operation.
+
+#### Background
+
+To understand what happens during the shuffle we can consider the example
of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation
generates a new RDD where all values for a single key are combined into a tuple
- the key and an `Iterable` object containing all the associated values. The
challenge is that not all values for a single key necessarily reside on the
same partition, or even the same machine, but they must be co-located to
present a single array per key.
+
+In Spark, data is generally not distributed across partitions to be in the
ncessary place for a specific operation. During computations, a single task
will operate on a single partition - thus, to organize all the data for a
single `groupByKey` reduce task to execute, Spark needs to perform an
all-to-all operation. It must read from all partitions to find all the values
for all keys, and then organize those such that all values for any key lie
within the same partition - this is called the **shuffle**.
+
+Although the set of elements in each partition of newly shuffled data will
be deterministic, the ordering of these elements is not. If one desires
predictably ordered data following shuffle operations,
[`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy`
can be used to perform a global sort. A similar operation,
[`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with
`mapPartitions`, may be used to enact a Hadoop style shuffle.
+
+Operations which can cause a shuffle include **repartion** operations like
[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey**
operations (except for counting) like [`groupByKey`](#GroupByLink) and
[`reduceByKey`](#ReduceByLink) , and **join** operations like
[`cogroup`](#CogroupLink) and [`join`](#JoinLink).
+
+#### Performance Impact
+**Shuffle** is an expensive operation since it involves disk I/O, data
serialization, and network I/O. To organize data for the shuffle, internally,
Spark creates a hash table, which, for large operations, can consume
significant amounts of heap memory. When data does not fit in memory, for all
shuffle operations with the exception of `sortByKey`, Spark will spill these
tables to disk, incurring the additional overhead of disk I/O and increased
garbage collection. Since `sortByKey` does not spill these intermediate tables
to disk, the shuffle operation may cause OOM errors.
--- End diff --
A few factual edits:
* Spark always writes all shuffle data to disk on the map side.
* A hash table is only used on the map side for reduceByKey and
aggregateByKey, and only on the reduce side for the ByKey operations.
* sortByKey no longer can OOM.
Also, I would avoid mentioning hash-based shuffle at all because sort-based
shuffle is now pretty much what we expect everybody to use.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]