I am using the following code segment : 

countPerWindow.foreachRDD(new Function<JavaPairRDD&lt;String, Long>, Void>()
{
            @Override
            public Void call(JavaPairRDD<String, Long> rdd) throws Exception
{

                Comparator<Tuple2&lt;String,Long>> comp = new
Comparator<Tuple2&lt;String,Long> >()
                {

                    public int compare(Tuple2<String,Long> tupleA,
                                       Tuple2<String,Long> tupleB)
                    {
                        return 1-tupleA._2.compareTo(tupleB._2);
                    }

                };

               List<scala.Tuple2&lt;String,Long>> top = rdd.top(5,comp); //
creating error

               System.out.println("Top 5 are : ");
                for(int i=0;i<top.size();++i)
                {
                    System.out.println(top.get(i)._2 + " " + top.get(i)._1);
                }
                return null;
            }
        });
    }




I am getting the following error related to serialisation  : 

org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException

Detailed Error :

 INFO  org.apache.spark.scheduler.DAGScheduler - Failed to run top at
OptimisingSort.java:173
2014-06-03 13:10:57,180 [spark-akka.actor.default-dispatcher-14] ERROR
org.apache.spark.streaming.scheduler.JobScheduler - Error running job
streaming job 1401801057000 ms.2
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: OptimisingSort$6$1
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)

How can I remove this error ?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to