Hi All,
My original RDD contains arrays of doubles. when appying a count()
operator to the original RDD I get the result as expected.
However when I run a map on the original RDD in order to generate a new
RDD with only the first element of each array, and try to apply count()
to the new generated RDD I get the following exception:
19829 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - Failed
to run count at AnalyticsEngine.java:133
[error] (run-main) org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
If a run a take() operation on the new RDD I receive the results as
expected. here is my code:
JavaRDD<Double> rdd2 = rdd.flatMap( new FlatMapFunction<Tuple2<Object,
BSONObject>, Double>() {
@Override
public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
BSONObject doc = e._2();
List<List<Double>> vals = (List<List<Double>>)doc.get("data");
List<Double> results = new ArrayList<Double>();
for (int i=0; i< vals.size();i++ )
results.add((Double)vals.get(i).get(0));
return results;
}
});
logger.info("Take: {}", rdd2.take(100));
logger.info("Count: {}", rdd2.count());
Any ideas on what I am doing wrong ?
Thanks,
Yadid
--
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139