Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4696#discussion_r25103091
  
    --- Diff: docs/programming-guide.md ---
    @@ -728,6 +728,63 @@ def doStuff(self, rdd):
     
     </div>
     
    +### Understanding closures
    +One of the harder things about Spark is understanding the scope and life 
cycle of variables and methods when executing code across a cluster. A frequent 
source of confusion is shown below - where we perform a common task 
(incrementing a counter from inside of a for-loop). In our example, we look at 
`foreach()` but this same scenario will apply to any other RDD operations that 
modify variables outside of their scope. 
    +
    +#### Example
    +
    +Consider the naiive RDD element sum below which, behaves completely 
differently when running spark in `local` mode (e.g. via the shell) and when 
deploying a Spark application to a cluster (e.g. via spark-submit to YARN): 
    +
    +<div class="codetabs">
    +
    +<div data-lang="scala"  markdown="1">
    +{% highlight scala %}
    +var counter = 0
    +var rdd = sc.parallelize(data)
    +rdd.foreach(x => counter += x)
    +
    +println("Counter value: " + counter)
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="java"  markdown="1">
    +{% highlight java %}
    +int counter = 0;
    +JavaRDD<Integer> rdd = sc.parallelize(data); 
    +rdd.foreach(x -> counter += x;)
    +
    +println("Counter value: " + counter)
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="python"  markdown="1">
    +{% highlight python %}
    +counter = 0
    +rdd = sc.parallelize(data)
    +rdd.foreach(lambda x => counter+= x)
    +
    +print("Counter value: " + counter)
    +
    +{% endhighlight %}
    +</div>
    +
    +</div>
    +
    +#### Local vs. cluster modes
    +
    +In local mode, the above code will correctly sum the values within the rdd 
and store it in **counter**. This is because both the RDD and the variable 
**counter** are in the same memory on the driver node. 
    +
    +However, in `cluster` mode, what happens is more complicated, and the 
above code will not work correctly. In `cluster` mode, Spark breaks up the 
processing of RDD operations into tasks - each of which is operated on by a 
seperate executor. Prior to execution, Spark computes the **closure**. The 
closure is those variables and methods which must be visible for the remote 
executor (running on a seperate worker node) to perform its computations on the 
RDD (in this case `foreach()`). This closure is serialized and sent to each 
executor. 
    +
    +The problem here is that the variables within the closure sent to each 
executor are now copies and thus, when **counter** is referenced within the 
`foreach` function, it's no longer the **counter** on the driver node. There is 
still a **counter** in the memory of the driver node but this is no longer 
visible to the executors! The executors only sees the copy from the serialized 
closure. Thus, the final value of **counter** will still be zero since all 
operations on **counter** were referencing the value within the serialized 
closure.  
    +
    +The one exception to this is when the variable being modified is an 
`Accumulator`. Accumulators in Spark are used specifically to provide a 
mechanism for safely updating a variable when execution is split up across 
worker nodes in a cluster. The Accumulators section of this guide discusses 
these in more detail.  
    +
    +In general, closures - constructs like loops or locally defined methods, 
are always executed on remote executors. With the exception of local testing 
mode, in Spark they should not be used to mutate some global state. Use an 
accumulator instead if some global aggregation is needed.
    +
    +#### Printing elements of an RDD 
    +Another common idiom is attempting to print out the elements of an rdd 
using `rdd.foreach(println)` or `rdd.map(println)`. Intuitively, it seems that 
this should work. But again, consider that in `cluster` mode, the `println()` 
being called by the executors is now the `println()` that is local to it, not 
the one on the driver! Consequently, Spark will start writing to stdout on the 
worker node, potentially filling up `/tmp` storage rapdily. To avoid this, one 
can use the `collect()` method to first bring the RDD to the driver node thus: 
`rdd.collect().foreach(println)`. Because `collect()` will aggregate the entire 
RDD and the RDD may be very large, this can cause buffer overflows or memory 
errors. A safer approach is to use the `take()` method to only get a few 
elements of the RDD: `rdd.take(100).foreach(println)`.
    --- End diff --
    
    > will aggregate
    
    How about
    
    > This can cause the driver to run out of memory, though, because 
`collect()` fetches the entire RDD to a single machine; a safer approach is ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to