Hi All,

My original RDD contains arrays of doubles. when appying a count() operator to the original RDD I get the result as expected. However when I run a map on the original RDD in order to generate a new RDD with only the first element of each array, and try to apply count() to the new generated RDD I get the following exception:

19829 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - Failed to run count at AnalyticsEngine.java:133 [error] (run-main) org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)


If a run a take() operation on the new RDD I receive the results as expected. here is my code:


JavaRDD<Double> rdd2 = rdd.flatMap( new FlatMapFunction<Tuple2<Object, BSONObject>, Double>() {
        @Override
        public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
          BSONObject doc = e._2();
          List<List<Double>> vals = (List<List<Double>>)doc.get("data");
          List<Double> results = new ArrayList<Double>();
          for (int i=0; i< vals.size();i++ )
              results.add((Double)vals.get(i).get(0));
          return results;

        }
        });

        logger.info("Take: {}", rdd2.take(100));
        logger.info("Count: {}", rdd2.count());


Any ideas on what I am doing wrong ?

Thanks,

Yadid



--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139



Reply via email to