SPARK-729:  Closures not always serialized at capture time

[SPARK-729](https://spark-project.atlassian.net/browse/SPARK-729) concerns when 
free variables in closure arguments to transformations are captured.  
Currently, it is possible for closures to get the environment in which they are 
serialized (not the environment in which they are created).  There are a few 
possible approaches to solving this problem and this PR will discuss some of 
them.  The approach I took has the advantage of being simple, obviously 
correct, and minimally-invasive, but it preserves something that has been 
bothering me about Spark's closure handling, so I'd like to discuss an 
alternative and get some feedback on whether or not it is worth pursuing.

## What I did

The basic approach I took depends on the work I did for #143, and so this PR is 
based atop that.  Specifically: #143 modifies `ClosureCleaner.clean` to 
preemptively determine whether or not closures are serializable immediately 
upon closure cleaning (rather than waiting for an job involving that closure to 
be scheduled).  Thus non-serializable closure exceptions will be triggered by 
the line defining the closure rather than triggered where the closure is used.

Since the easiest way to determine whether or not a closure is serializable is 
to attempt to serialize it, the code in #143 is creating a serialized closure 
as part of `ClosureCleaner.clean`.  `clean` currently modifies its argument, 
but the method in `SparkContext` that wraps it to return a value (a reference 
to the modified-in-place argument).  This branch modifies 
`ClosureCleaner.clean` so that it returns a value:  if it is cleaning a 
serializable closure, it returns the result of deserializing its serialized 
argument; therefore it is returning a closure with an environment captured at 
cleaning time.  `SparkContext.clean` then returns the result of 
`ClosureCleaner.clean`, rather than a reference to its modified-in-place 
argument.

I've added tests for this behavior (777a1bc).  The pull request as it stands, 
given the changes in #143, is nearly trivial.  There is some overhead from 
deserializing the closure, but it is minimal and the benefit of obvious 
operational correctness (vs. a more sophisticated but harder-to-validate 
transformation in `ClosureCleaner`) seems pretty important.  I think this is a 
fine way to solve this problem, but it's not perfect.

## What we might want to do

The thing that has been bothering me about Spark's handling of closures is that 
it seems like we should be able to statically ensure that cleaning and 
serialization happen exactly once for a given closure.  If we serialize a 
closure in order to determine whether or not it is serializable, we should be 
able to hang on to the generated byte buffer and use it instead of 
re-serializing the closure later.  By replacing closures with instances of a 
sum type that encodes whether or not a closure has been cleaned or serialized, 
we could handle clean, to-be-cleaned, and serialized closures separately with 
case matches.  Here's a somewhat-concrete sketch (taken from my git stash) of 
what this might look like:

```scala
package org.apache.spark.util

import java.nio.ByteBuffer
import scala.reflect.ClassManifest

sealed abstract class ClosureBox[T] { def func: T }
final case class RawClosure[T](func: T) extends ClosureBox[T] {}
final case class CleanedClosure[T](func: T) extends ClosureBox[T] {}
final case class SerializedClosure[T](func: T, bytebuf: ByteBuffer) extends 
ClosureBox[T] {}

object ClosureBoxImplicits {
  implicit def closureBoxFromFunc[T <: AnyRef](fun: T) = new RawClosure[T](fun)
}
```

With these types declared, we'd be able to change `ClosureCleaner.clean` to 
take a `ClosureBox[T=>U]` (possibly generated by implicit conversion) and 
return a `ClosureBox[T=>U]` (either a `CleanedClosure[T=>U]` or a 
`SerializedClosure[T=>U]`, depending on whether or not serializability-checking 
was enabled) instead of a `T=>U`.  A case match could thus short-circuit 
cleaning or serializing closures that had already been cleaned or serialized 
(both in `ClosureCleaner` and in the closure serializer).  
Cleaned-and-serialized closures would be represented by a boxed tuple of the 
original closure and a serialized copy (complete with an environment quiesced 
at transformation time).  Additional implicit conversions could convert from 
`ClosureBox` instances to the underlying function type where appropriate.  
Tracking this sort of state in the type system seems like the right thing to do 
to me.

### Why we might not want to do that

_It's pretty invasive._  Every function type used by every `RDD` subclass would 
have to change to reflect that they expected a `ClosureBox[T=>U]` instead of a 
`T=>U`.  This obscures what's going on and is not a little ugly.  Although I 
really like the idea of using the type system to enforce the clean-or-serialize 
once discipline, it might not be worth adding another layer of types (even if 
we could hide some of the extra boilerplate with judicious application of 
implicit conversions).

_It statically guarantees a property whose absence is unlikely to cause any 
serious problems as it stands._  It appears that all closures are currently 
dynamically cleaned once and it's not obvious that repeated closure-cleaning is 
likely to be a problem in the future.  Furthermore, serializing closures is 
relatively cheap, so doing it once to check for serialization and once again to 
actually ship them across the wire doesn't seem like a big deal.

Taken together, these seem like a high price to pay for statically guaranteeing 
that closures are operated upon only once.

## Other possibilities

I felt like the serialize-and-deserialize approach was best due to its obvious 
simplicity.  But it would be possible to do a more sophisticated transformation 
within `ClosureCleaner.clean`.  It might also be possible for `clean` to modify 
its argument in a way so that whether or not a given closure had been cleaned 
would be apparent upon inspection; this would buy us some of the operational 
benefits of the `ClosureBox` approach but not the static cleanliness.

I'm interested in any feedback or discussion on whether or not the problems 
with the type-based approach indeed outweigh the advantage, as well as of 
approaches to this issue and to closure handling in general.

Author: William Benton <wi...@redhat.com>

Closes #189 from willb/spark-729 and squashes the following commits:

f4cafa0 [William Benton] Stylistic changes and cleanups
b3d9c86 [William Benton] Fixed style issues in tests
9b56ce0 [William Benton] Added array-element capture test
97e9d91 [William Benton] Split closure-serializability failure tests
12ef6e3 [William Benton] Skip proactive closure capture for runJob
8ee3ee7 [William Benton] Predictable closure environment capture
12c63a7 [William Benton] Added tests for variable capture in closures
d6e8dd6 [William Benton] Don't check serializability of DStream transforms.
4ecf841 [William Benton] Make proactive serializability checking optional.
d8df3db [William Benton] Adds proactive closure-serializablilty checking
21b4b06 [William Benton] Test cases for SPARK-897.
d5947b3 [William Benton] Ensure assertions in Graph.apply are asserted.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ca3b2bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ca3b2bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ca3b2bc

Branch: refs/heads/branch-1.0
Commit: 8ca3b2bc90a63b23a03f339e390174cd7a672b40
Parents: 0adc932
Author: William Benton <wi...@redhat.com>
Authored: Wed Apr 9 18:56:27 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Wed Apr 9 18:56:27 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 16 ++--
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  6 +-
 .../org/apache/spark/util/ClosureCleaner.scala  | 21 ++++-
 .../scala/org/apache/spark/FailureSuite.scala   | 17 +++-
 .../ProactiveClosureSerializationSuite.scala    | 94 ++++++++++++++++++++
 .../apache/spark/util/ClosureCleanerSuite.scala | 68 ++++++++++++++
 .../org/apache/spark/graphx/GraphSuite.scala    |  2 +-
 .../spark/streaming/dstream/DStream.scala       |  8 +-
 8 files changed, 218 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7630523..545807f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1002,7 +1002,9 @@ class SparkContext(config: SparkConf) extends Logging {
       require(p >= 0 && p < rdd.partitions.size, s"Invalid partition 
requested: $p")
     }
     val callSite = getCallSite
-    val cleanedFunc = clean(func)
+    // There's no need to check this function for serializability,
+    // since it will be run right away.
+    val cleanedFunc = clean(func, false)
     logInfo("Starting job: " + callSite)
     val start = System.nanoTime
     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
@@ -1135,14 +1137,18 @@ class SparkContext(config: SparkConf) extends Logging {
   def cancelAllJobs() {
     dagScheduler.cancelAllJobs()
   }
-
+  
   /**
    * Clean a closure to make it ready to serialized and send to tasks
    * (removes unreferenced variables in $outer's, updates REPL variables)
+   *
+   * @param f closure to be cleaned and optionally serialized
+   * @param captureNow whether or not to serialize this closure and capture 
any free 
+   * variables immediately; defaults to true.  If this is set and f is not 
serializable, 
+   * it will raise an exception.
    */
-  private[spark] def clean[F <: AnyRef](f: F): F = {
-    ClosureCleaner.clean(f)
-    f
+  private[spark] def clean[F <: AnyRef : ClassTag](f: F, captureNow: Boolean = 
true): F = {
+    ClosureCleaner.clean(f, captureNow)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3437b2c..e363ea7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -660,14 +660,16 @@ abstract class RDD[T: ClassTag](
    * Applies a function f to all elements of this RDD.
    */
   def foreach(f: T => Unit) {
-    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
   }
 
   /**
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartition(f: Iterator[T] => Unit) {
-    sc.runJob(this, (iter: Iterator[T]) => f(iter))
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index cdbbc65..e474b1a 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -22,10 +22,14 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 import scala.collection.mutable.Map
 import scala.collection.mutable.Set
 
+import scala.reflect.ClassTag
+
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, 
ClassVisitor, MethodVisitor, Type}
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
 
 import org.apache.spark.Logging
+import org.apache.spark.SparkEnv
+import org.apache.spark.SparkException
 
 private[spark] object ClosureCleaner extends Logging {
   // Get an ASM class reader for a given class from the JAR that loaded it
@@ -101,7 +105,7 @@ private[spark] object ClosureCleaner extends Logging {
     }
   }
   
-  def clean(func: AnyRef) {
+  def clean[F <: AnyRef : ClassTag](func: F, captureNow: Boolean = true): F = {
     // TODO: cache outerClasses / innerClasses / accessedFields
     val outerClasses = getOuterClasses(func)
     val innerClasses = getInnerClasses(func)
@@ -150,6 +154,21 @@ private[spark] object ClosureCleaner extends Logging {
       field.setAccessible(true)
       field.set(func, outer)
     }
+    
+    if (captureNow) {
+      cloneViaSerializing(func)
+    } else {
+      func
+    }
+  }
+
+  private def cloneViaSerializing[T: ClassTag](func: T): T = {
+    try {
+      val serializer = SparkEnv.get.closureSerializer.newInstance()
+      serializer.deserialize[T](serializer.serialize[T](func))
+    } catch {
+      case ex: Exception => throw new SparkException("Task not serializable: " 
+ ex.toString)
+    }
   }
   
   private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: 
Boolean): AnyRef = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala 
b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index 12dbebc..4f93004 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -107,7 +107,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
     FailureSuiteState.clear()
   }
 
-  test("failure because task closure is not serializable") {
+  test("failure because closure in final-stage task is not serializable") {
     sc = new SparkContext("local[1,1]", "test")
     val a = new NonSerializable
 
@@ -118,6 +118,13 @@ class FailureSuite extends FunSuite with LocalSparkContext 
{
     assert(thrown.getClass === classOf[SparkException])
     assert(thrown.getMessage.contains("NotSerializableException"))
 
+    FailureSuiteState.clear()
+  }
+
+  test("failure because closure in early-stage task is not serializable") {
+    sc = new SparkContext("local[1,1]", "test")
+    val a = new NonSerializable
+
     // Non-serializable closure in an earlier stage
     val thrown1 = intercept[SparkException] {
       sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new 
HashPartitioner(3)).count()
@@ -125,6 +132,13 @@ class FailureSuite extends FunSuite with LocalSparkContext 
{
     assert(thrown1.getClass === classOf[SparkException])
     assert(thrown1.getMessage.contains("NotSerializableException"))
 
+    FailureSuiteState.clear()
+  }
+
+  test("failure because closure in foreach task is not serializable") {
+    sc = new SparkContext("local[1,1]", "test")
+    val a = new NonSerializable
+
     // Non-serializable closure in foreach function
     val thrown2 = intercept[SparkException] {
       sc.parallelize(1 to 10, 2).foreach(x => println(a))
@@ -135,5 +149,6 @@ class FailureSuite extends FunSuite with LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+
   // TODO: Need to add tests with shuffle fetch failures.
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
 
b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
new file mode 100644
index 0000000..7666226
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer;
+
+import java.io.NotSerializableException
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
+import org.apache.spark.SharedSparkContext
+
+/* A trivial (but unserializable) container for trivial functions */
+class UnserializableClass {
+  def op[T](x: T) = x.toString
+  
+  def pred[T](x: T) = x.toString.length % 2 == 0
+}
+
+class ProactiveClosureSerializationSuite extends FunSuite with 
SharedSparkContext {
+
+  def fixture = (sc.parallelize(0 until 1000).map(_.toString), new 
UnserializableClass)
+
+  test("throws expected serialization exceptions on actions") {
+    val (data, uc) = fixture
+      
+    val ex = intercept[SparkException] {
+      data.map(uc.op(_)).count
+    }
+        
+    assert(ex.getMessage.matches(".*Task not serializable.*"))
+  }
+
+  // There is probably a cleaner way to eliminate boilerplate here, but we're
+  // iterating over a map from transformation names to functions that perform 
that
+  // transformation on a given RDD, creating one test case for each
+  
+  for (transformation <- 
+      Map("map" -> map _, "flatMap" -> flatMap _, "filter" -> filter _, 
"mapWith" -> mapWith _,
+          "mapPartitions" -> mapPartitions _, "mapPartitionsWithIndex" -> 
mapPartitionsWithIndex _,
+          "mapPartitionsWithContext" -> mapPartitionsWithContext _, 
"filterWith" -> filterWith _)) {
+    val (name, xf) = transformation
+    
+    test(s"$name transformations throw proactive serialization exceptions") {
+      val (data, uc) = fixture
+      
+      val ex = intercept[SparkException] {
+        xf(data, uc)
+      }
+
+      assert(ex.getMessage.matches(".*Task not serializable.*"), s"RDD.$name 
doesn't proactively throw NotSerializableException")
+    }
+  }
+  
+  def map(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.map(y => uc.op(y))
+
+  def mapWith(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.mapWith(x => x.toString)((x,y) => x + uc.op(y))
+    
+  def flatMap(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.flatMap(y=>Seq(uc.op(y)))
+  
+  def filter(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.filter(y=>uc.pred(y))
+  
+  def filterWith(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.filterWith(x => x.toString)((x,y) => uc.pred(y))
+  
+  def mapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.mapPartitions(_.map(y => uc.op(y)))
+  
+  def mapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): 
RDD[String] =
+    x.mapPartitionsWithIndex((_, it) => it.map(y => uc.op(y)))
+  
+  def mapPartitionsWithContext(x: RDD[String], uc: UnserializableClass): 
RDD[String] =
+    x.mapPartitionsWithContext((_, it) => it.map(y => uc.op(y)))
+  
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 439e564..c635da6 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -50,6 +50,27 @@ class ClosureCleanerSuite extends FunSuite {
     val obj = new TestClassWithNesting(1)
     assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
   }
+  
+  test("capturing free variables in closures at RDD definition") {
+    val obj = new TestCaptureVarClass()
+    val (ones, onesPlusZeroes) = obj.run()
+    
+    assert(ones === onesPlusZeroes)
+  }
+
+  test("capturing free variable fields in closures at RDD definition") {
+    val obj = new TestCaptureFieldClass()
+    val (ones, onesPlusZeroes) = obj.run()
+    
+    assert(ones === onesPlusZeroes)
+  }
+  
+  test("capturing arrays in closures at RDD definition") {
+    val obj = new TestCaptureArrayEltClass()
+    val (observed, expected) = obj.run()
+    
+    assert(observed === expected)
+  }
 }
 
 // A non-serializable class we create in closures to make sure that we aren't
@@ -143,3 +164,50 @@ class TestClassWithNesting(val y: Int) extends 
Serializable {
     }
   }
 }
+
+class TestCaptureFieldClass extends Serializable {
+  class ZeroBox extends Serializable {
+    var zero = 0
+  }
+
+  def run(): (Int, Int) = {
+    val zb = new ZeroBox
+  
+    withSpark(new SparkContext("local", "test")) {sc =>
+      val ones = sc.parallelize(Array(1, 1, 1, 1, 1))
+      val onesPlusZeroes = ones.map(_ + zb.zero)
+
+      zb.zero = 5
+    
+      (ones.reduce(_ + _), onesPlusZeroes.reduce(_ + _))
+    }
+  }
+}
+
+class TestCaptureArrayEltClass extends Serializable {
+  def run(): (Int, Int) = {
+    withSpark(new SparkContext("local", "test")) {sc =>
+      val rdd = sc.parallelize(1 to 10)
+      val data = Array(1, 2, 3)
+      val expected = data(0)
+      val mapped = rdd.map(x => data(0))
+      data(0) = 4
+      (mapped.first, expected)
+    }
+  }
+}
+
+class TestCaptureVarClass extends Serializable {
+  def run(): (Int, Int) = {
+    var zero = 0
+  
+    withSpark(new SparkContext("local", "test")) {sc =>
+      val ones = sc.parallelize(Array(1, 1, 1, 1, 1))
+      val onesPlusZeroes = ones.map(_ + zero)
+
+      zero = 5
+    
+      (ones.reduce(_ + _), onesPlusZeroes.reduce(_ + _))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 28d34dd..c65e366 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -62,7 +62,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       assert( graph.edges.count() === rawEdges.size )
       // Vertices not explicitly provided but referenced by edges should be 
created automatically
       assert( graph.vertices.count() === 100)
-      graph.triplets.map { et =>
+      graph.triplets.collect.map { et =>
         assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && 
!et.srcAttr))
         assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && 
!et.dstAttr))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index d043200..4759b62 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -539,7 +539,7 @@ abstract class DStream[T: ClassTag] (
    * on each RDD of 'this' DStream.
    */
   def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
-    transform((r: RDD[T], t: Time) => 
context.sparkContext.clean(transformFunc(r)))
+    transform((r: RDD[T], t: Time) => 
context.sparkContext.clean(transformFunc(r), false))
   }
 
   /**
@@ -547,7 +547,7 @@ abstract class DStream[T: ClassTag] (
    * on each RDD of 'this' DStream.
    */
   def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): 
DStream[U] = {
-    val cleanedF = context.sparkContext.clean(transformFunc)
+    val cleanedF = context.sparkContext.clean(transformFunc, false)
     val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
       assert(rdds.length == 1)
       cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
@@ -562,7 +562,7 @@ abstract class DStream[T: ClassTag] (
   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
     ): DStream[V] = {
-    val cleanedF = ssc.sparkContext.clean(transformFunc)
+    val cleanedF = ssc.sparkContext.clean(transformFunc, false)
     transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => 
cleanedF(rdd1, rdd2))
   }
 
@@ -573,7 +573,7 @@ abstract class DStream[T: ClassTag] (
   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
     ): DStream[V] = {
-    val cleanedF = ssc.sparkContext.clean(transformFunc)
+    val cleanedF = ssc.sparkContext.clean(transformFunc, false)
     val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
       assert(rdds.length == 2)
       val rdd1 = rdds(0).asInstanceOf[RDD[T]]

Reply via email to