Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4696#discussion_r26079924
  
    --- Diff: docs/programming-guide.md ---
    @@ -728,6 +728,69 @@ def doStuff(self, rdd):
     
     </div>
     
    +### Understanding closures <a name="ClosuresLink"></a>
    +One of the harder things about Spark is understanding the scope and life 
cycle of variables and methods when executing code across a cluster. RDD 
operations that modify variables outside of their scope can be a frequent 
source of confusion. In the example below we'll look at code that uses 
`foreach()` to increment a counter, but similar issues can occur for other 
operations as well.
    +
    +#### Example
    +
    +Consider the naive RDD element sum below, which behaves completely 
differently depending on whether execution is happening within the same JVM. A 
common example of this is when running spark in `local` mode (`--master = 
local[n]`) versus deploying a Spark application to a cluster (e.g. via 
spark-submit to YARN): 
    +
    +<div class="codetabs">
    +
    +<div data-lang="scala"  markdown="1">
    +{% highlight scala %}
    +var counter = 0
    +var rdd = sc.parallelize(data)
    +
    +// Wrong: Don't do this!!
    +rdd.foreach(x => counter += x)
    +
    +println("Counter value: " + counter)
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="java"  markdown="1">
    +{% highlight java %}
    +int counter = 0;
    +JavaRDD<Integer> rdd = sc.parallelize(data); 
    +
    +// Wrong: Don't do this!!
    +rdd.foreach(x -> counter += x;)
    +
    +println("Counter value: " + counter)
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="python"  markdown="1">
    +{% highlight python %}
    +counter = 0
    +rdd = sc.parallelize(data)
    +
    +# Wrong: Don't do this!!
    +rdd.foreach(lambda x => counter += x)
    +
    +print("Counter value: " + counter)
    +
    +{% endhighlight %}
    +</div>
    +
    +</div>
    +
    +#### Local vs. cluster modes
    +
    +The primary challenge is that the behavior of the above code is undefined. 
In local mode with a single JVM, the above code will sum the values within the 
RDD and store it in **counter**. This is because both the RDD and the variable 
**counter** are in the same memory space on the driver node. 
    +
    +However, in `cluster` mode, what happens is more complicated, and the 
above may not work as intended. To execute jobs, Spark breaks up the processing 
of RDD operations into tasks - each of which is operated on by an executor. 
Prior to execution, Spark computes the **closure**. The closure is those 
variables and methods which must be visible for the executor to perform its 
computations on the RDD (in this case `foreach()`). This closure is serialized 
and sent to each executor. In `local` mode, there is only the one executors so 
everything shares the same closure. In `remote` mode however, this is not the 
case and the executors running on seperate worker nodes each have their own 
copy of the closure.
    +
    +What is happening here is that the variables within the closure sent to 
each executor are now copies and thus, when **counter** is referenced within 
the `foreach` function, it's no longer the **counter** on the driver node. 
There is still a **counter** in the memory of the driver node but this is no 
longer visible to the executors! The executors only sees the copy from the 
serialized closure. Thus, the final value of **counter** will still be zero 
since all operations on **counter** were referencing the value within the 
serialized closure.  
    +
    +To ensure well-defined behavior in these sorts of scenarios one should use 
an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to 
provide a mechanism for safely updating a variable when execution is split up 
across worker nodes in a cluster. The Accumulators section of this guide 
discusses these in more detail.  
    +
    +In general, closures - constructs like loops or locally defined methods, 
should not be used to mutate some global state. Spark does not define or 
guarantee the behavior of mutations to objects referenced from outside of 
closures. Some code that does this may work in local mode, but that's just by 
accident and such code will not behave as expected in distributed mode. Use an 
accumulator instead if some global aggregation is needed.
    --- End diff --
    
    accumulator -> `Accumulator` for consistency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to