I think Mayur meant that Spark doesn't necessarily clean the closure
under Java 7 -- is that true though? I didn't know of an issue there.

Some anonymous class in your (?) OptimisingSort class is getting
serialized, which may be fine and intentional, but it is not
serializable. You haven't posted that class, but look to things like
anonymous or inner classes that aren't marked Serializable.

On Wed, Jun 4, 2014 at 12:25 AM, Andrew Ash <and...@andrewash.com> wrote:
> Hi Mayur, is that closure cleaning a JVM issue or a Spark issue?  I'm used
> to thinking of closure cleaner as something Spark built.  Do you have
> somewhere I can read more about this?
>
>
> On Tue, Jun 3, 2014 at 12:47 PM, Mayur Rustagi <mayur.rust...@gmail.com>
> wrote:
>>
>> So are you using Java 7 or 8.
>> 7 doesnt clean closures properly. So you need to define a static class as
>> a function & then call that in your operations. Else it'll try to send the
>> whole class along with the function.
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi
>>
>>
>>
>> On Tue, Jun 3, 2014 at 7:19 PM, Sean Owen <so...@cloudera.com> wrote:
>>>
>>> Sorry if I'm dense but is OptimisingSort your class? it's saying you
>>> have included something from it in a function that is shipped off to
>>> remote workers but something in it is not java.io.Serializable.
>>> OptimisingSort$6$1 needs to be Serializable.
>>>
>>> On Tue, Jun 3, 2014 at 2:23 PM, nilmish <nilmish....@gmail.com> wrote:
>>> > I am using the following code segment :
>>> >
>>> > countPerWindow.foreachRDD(new Function<JavaPairRDD&lt;String, Long>,
>>> > Void>()
>>> > {
>>> >             @Override
>>> >             public Void call(JavaPairRDD<String, Long> rdd) throws
>>> > Exception
>>> > {
>>> >
>>> >                 Comparator<Tuple2&lt;String,Long>> comp = new
>>> > Comparator<Tuple2&lt;String,Long> >()
>>> >                 {
>>> >
>>> >                     public int compare(Tuple2<String,Long> tupleA,
>>> >                                        Tuple2<String,Long> tupleB)
>>> >                     {
>>> >                         return 1-tupleA._2.compareTo(tupleB._2);
>>> >                     }
>>> >
>>> >                 };
>>> >
>>> >                List<scala.Tuple2&lt;String,Long>> top =
>>> > rdd.top(5,comp); //
>>> > creating error
>>> >
>>> >                System.out.println("Top 5 are : ");
>>> >                 for(int i=0;i<top.size();++i)
>>> >                 {
>>> >                     System.out.println(top.get(i)._2 + " " +
>>> > top.get(i)._1);
>>> >                 }
>>> >                 return null;
>>> >             }
>>> >         });
>>> >     }
>>> >
>>> >
>>> >
>>> >
>>> > I am getting the following error related to serialisation  :
>>> >
>>> > org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> > java.io.NotSerializableException
>>> >
>>> > Detailed Error :
>>> >
>>> >  INFO  org.apache.spark.scheduler.DAGScheduler - Failed to run top at
>>> > OptimisingSort.java:173
>>> > 2014-06-03 13:10:57,180 [spark-akka.actor.default-dispatcher-14] ERROR
>>> > org.apache.spark.streaming.scheduler.JobScheduler - Error running job
>>> > streaming job 1401801057000 ms.2
>>> > org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> > java.io.NotSerializableException: OptimisingSort$6$1
>>> >         at
>>> >
>>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>> >         at
>>> >
>>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>> >         at
>>> >
>>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> >         at
>>> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> >         at
>>> >
>>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>> >         at
>>> >
>>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>> >         at
>>> >
>>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>> >
>>> > How can I remove this error ?
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> > http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> > Nabble.com.
>>
>>
>

Reply via email to