Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4696#discussion_r25102665
  
    --- Diff: docs/programming-guide.md ---
    @@ -728,6 +728,63 @@ def doStuff(self, rdd):
     
     </div>
     
    +### Understanding closures
    +One of the harder things about Spark is understanding the scope and life 
cycle of variables and methods when executing code across a cluster. A frequent 
source of confusion is shown below - where we perform a common task 
(incrementing a counter from inside of a for-loop). In our example, we look at 
`foreach()` but this same scenario will apply to any other RDD operations that 
modify variables outside of their scope. 
    +
    +#### Example
    +
    +Consider the naiive RDD element sum below which, behaves completely 
differently when running spark in `local` mode (e.g. via the shell) and when 
deploying a Spark application to a cluster (e.g. via spark-submit to YARN): 
    +
    +<div class="codetabs">
    +
    +<div data-lang="scala"  markdown="1">
    +{% highlight scala %}
    +var counter = 0
    +var rdd = sc.parallelize(data)
    +rdd.foreach(x => counter += x)
    +
    +println("Counter value: " + counter)
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="java"  markdown="1">
    +{% highlight java %}
    +int counter = 0;
    +JavaRDD<Integer> rdd = sc.parallelize(data); 
    +rdd.foreach(x -> counter += x;)
    +
    +println("Counter value: " + counter)
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="python"  markdown="1">
    +{% highlight python %}
    +counter = 0
    +rdd = sc.parallelize(data)
    +rdd.foreach(lambda x => counter+= x)
    +
    +print("Counter value: " + counter)
    +
    +{% endhighlight %}
    +</div>
    +
    +</div>
    +
    +#### Local vs. cluster modes
    +
    +In local mode, the above code will correctly sum the values within the rdd 
and store it in **counter**. This is because both the RDD and the variable 
**counter** are in the same memory on the driver node. 
    +
    +However, in `cluster` mode, what happens is more complicated, and the 
above code will not work correctly. In `cluster` mode, Spark breaks up the 
processing of RDD operations into tasks - each of which is operated on by a 
seperate executor. Prior to execution, Spark computes the **closure**. The 
closure is those variables and methods which must be visible for the remote 
executor (running on a seperate worker node) to perform its computations on the 
RDD (in this case `foreach()`). This closure is serialized and sent to each 
executor. 
    --- End diff --
    
    > ... breaks up the processing of RDD operations into task - each of which 
is operated on by a separate executor"
    
    Taken slightly out of context, this sounds misleading since we also have 
multiple tasks in local mode and all of the tasks might run on a single 
executor even in `cluster` mode.  I think the key point to make is that in 
local mode, objects referenced from outside of the closure are the same object 
for all tasks, whereas in distributed mode each task gets its own copy from the 
closure.
    
    Or, maybe even more succinctly: Spark doesn't define / guarantee the 
behavior of mutations to objects referenced from outside of closures.  Some 
code that does this may work in `local` mode, but that's just by accident and 
such code will not behave as expected in distributed mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to