Hi Mayur, is that closure cleaning a JVM issue or a Spark issue?  I'm used
to thinking of closure cleaner as something Spark built.  Do you have
somewhere I can read more about this?


On Tue, Jun 3, 2014 at 12:47 PM, Mayur Rustagi <mayur.rust...@gmail.com>
wrote:

> So are you using Java 7 or 8.
> 7 doesnt clean closures properly. So you need to define a static class as
> a function & then call that in your operations. Else it'll try to send the
> whole class along with the function.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Tue, Jun 3, 2014 at 7:19 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> Sorry if I'm dense but is OptimisingSort your class? it's saying you
>> have included something from it in a function that is shipped off to
>> remote workers but something in it is not java.io.Serializable.
>> OptimisingSort$6$1 needs to be Serializable.
>>
>> On Tue, Jun 3, 2014 at 2:23 PM, nilmish <nilmish....@gmail.com> wrote:
>> > I am using the following code segment :
>> >
>> > countPerWindow.foreachRDD(new Function<JavaPairRDD&lt;String, Long>,
>> Void>()
>> > {
>> >             @Override
>> >             public Void call(JavaPairRDD<String, Long> rdd) throws
>> Exception
>> > {
>> >
>> >                 Comparator<Tuple2&lt;String,Long>> comp = new
>> > Comparator<Tuple2&lt;String,Long> >()
>> >                 {
>> >
>> >                     public int compare(Tuple2<String,Long> tupleA,
>> >                                        Tuple2<String,Long> tupleB)
>> >                     {
>> >                         return 1-tupleA._2.compareTo(tupleB._2);
>> >                     }
>> >
>> >                 };
>> >
>> >                List<scala.Tuple2&lt;String,Long>> top =
>> rdd.top(5,comp); //
>> > creating error
>> >
>> >                System.out.println("Top 5 are : ");
>> >                 for(int i=0;i<top.size();++i)
>> >                 {
>> >                     System.out.println(top.get(i)._2 + " " +
>> top.get(i)._1);
>> >                 }
>> >                 return null;
>> >             }
>> >         });
>> >     }
>> >
>> >
>> >
>> >
>> > I am getting the following error related to serialisation  :
>> >
>> > org.apache.spark.SparkException: Job aborted: Task not serializable:
>> > java.io.NotSerializableException
>> >
>> > Detailed Error :
>> >
>> >  INFO  org.apache.spark.scheduler.DAGScheduler - Failed to run top at
>> > OptimisingSort.java:173
>> > 2014-06-03 13:10:57,180 [spark-akka.actor.default-dispatcher-14] ERROR
>> > org.apache.spark.streaming.scheduler.JobScheduler - Error running job
>> > streaming job 1401801057000 ms.2
>> > org.apache.spark.SparkException: Job aborted: Task not serializable:
>> > java.io.NotSerializableException: OptimisingSort$6$1
>> >         at
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> >         at
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> >         at
>> >
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >         at
>> > org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> >         at
>> > org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>> >         at
>> > org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>> >
>> > How can I remove this error ?
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Reply via email to