Hi all,

I am using Spark Streaming API (I'm on version 2.10 for spark and
streaming), and I am running into a function serialization issue that I do
not run into when using Spark in batch (non-streaming) mode.

If I wrote code like this:

def run(): Unit = {
    val newStream = stream.map(x => { x + " foo " })
    // ...
}

everything works fine.. But if I try it like this:

def transform(x: String): String = { x + " foo " }

def run(): Unit = {
    val newStream = stream.map(transform)
    // ...
}

..the program fails being unable to serialize the closure (which when
passing a method to a function that expects a closure, it should be
auto-converted to my understanding).

However it works fine if I declare a closure inside run() and use that like
so:

val transform = (x: String) => { x + " foo " }

If it's declared outside of run(), however, it will also crash.

This is an example stack trace of the error I'm running into. This can be a
hassle to debug so I hope I wouldn't have to get around this by having to
use a local closure/function every time. Thanks for any help in advance.

org.apache.spark.SparkException: Task not serializable
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
        at
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266)
        at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
        at com.my.cool.app.MyClass.run(MyClass.scala:90)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
        at 
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.io.NotSerializableException: Graph is unexpectedly null when
DStream is being serialized.
Serialization stack:

        at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 20 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to