Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/4696#discussion_r25598471
--- Diff: docs/programming-guide.md ---
@@ -728,6 +728,69 @@ def doStuff(self, rdd):
</div>
+### Understanding closures <a name="ClosuresLink"></a>
+One of the harder things about Spark is understanding the scope and life
cycle of variables and methods when executing code across a cluster. RDD
operations that modify variables outside of their scope can be a frequent
source of confusion. In the example below we'll look at code that uses
`foreach()` to increment a counter, but similar issues can occur for other
operations as well.
+
+#### Example
+
+Consider the naive RDD element sum below, which behaves completely
differently when running spark in `local` mode (e.g. via the shell) and when
deploying a Spark application to a cluster (e.g. via spark-submit to YARN):
+
+<div class="codetabs">
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+var counter = 0
+var rdd = sc.parallelize(data)
+
+// Wrong: Don't do this!!
+rdd.foreach(x => counter += x)
+
+println("Counter value: " + counter)
+{% endhighlight %}
+</div>
+
+<div data-lang="java" markdown="1">
+{% highlight java %}
+int counter = 0;
+JavaRDD<Integer> rdd = sc.parallelize(data);
+
+// Wrong: Don't do this!!
+rdd.foreach(x -> counter += x;)
+
+println("Counter value: " + counter)
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+counter = 0
+rdd = sc.parallelize(data)
+
+# Wrong: Don't do this!!
+rdd.foreach(lambda x => counter+= x)
+
+print("Counter value: " + counter)
+
+{% endhighlight %}
+</div>
+
+</div>
+
+#### Local vs. cluster modes
+
+In local mode, the above code will correctly sum the values within the RDD
and store it in **counter**. This is because both the RDD and the variable
**counter** are in the same memory on the driver node.
+
+However, in `cluster` mode, what happens is more complicated, and the
above code will not work correctly. To execute jobs Spark breaks up the
processing of RDD operations into tasks - each of which is operated on by an
executor. Prior to execution, Spark computes the **closure**. The closure is
those variables and methods which must be visible for the executor to perform
its computations on the RDD (in this case `foreach()`). This closure is
serialized and sent to each executor. In `local` mode, there is only the one
executors so everything shares the same closure. In `remote` mode however, this
is not the case and the executors running on seperate worker nodes each have
their own copy of the closure.
+
+The problem here is that the variables within the closure sent to each
executor are now copies and thus, when **counter** is referenced within the
`foreach` function, it's no longer the **counter** on the driver node. There is
still a **counter** in the memory of the driver node but this is no longer
visible to the executors! The executors only sees the copy from the serialized
closure. Thus, the final value of **counter** will still be zero since all
operations on **counter** were referencing the value within the serialized
closure.
+
+The one exception to this is when the variable being modified is an
[`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to
provide a mechanism for safely updating a variable when execution is split up
across worker nodes in a cluster. The Accumulators section of this guide
discusses these in more detail.
+
+In general, closures - constructs like loops or locally defined methods,
should not be used to mutate some global state. Spark does not define or
guarantee the behavior of mutations to objects referenced from outside of
closures. Some code that does this may work in local mode, but that's just by
accident and such code will not behave as expected in distributed mode. Use an
accumulator instead if some global aggregation is needed.
+
+#### Printing elements of an RDD
+Another common idiom is attempting to print out the elements of an RDD
using `rdd.foreach(println)` or `rdd.map(println)`. Intuitively, it seems that
this should work. But again, consider that in `cluster` mode, the output to
`stdout` being called by the executors is now writing to the executor's
`stdout` instead, not the one on the driver! Consequently, Spark will start
writing to `stdout` on the worker node, potentially filling up `/tmp` storage
rapdily. To avoid this, one can use the `collect()` method to first bring the
RDD to the driver node thus: `rdd.collect().foreach(println)`. This can cause
the driver to run out of memory, though, because collect() fetches the entire
RDD to a single machine; a safer approach is to use the `take()` method to only
get a few elements of the RDD: `rdd.take(100).foreach(println)`.
--- End diff --
Mark `collect()` as code font at the end here. It is worth noting that
`collect()` brings all data to the driver, and if the RDD is large, could cause
the driver to run out of memory. Yes, `take()` is a better idea, but I suppose
only if you really just want to print a few elements. It's not the same as
`collect()` of course. Might be worth phrasing that way ... "If you only need
to print a few items from the RDD..."
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]