No problem - thanks for helping us diagnose this!

On Tue, Nov 5, 2013 at 5:04 AM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> Ah, I see. Thanks very much for you assistance Patrick and Reynold.
> As a workaround for now, I implemented the SC field as transient and its
> working fine.
>
> Yadid
>
>
>
> On 11/3/13 9:05 PM, Reynold Xin wrote:
>
> Yea so every inner class actually contains a field referencing the outer
> class. In your case, the anonymous class DoubleFlatMapFunction actually has
> a this$0 field referencing the outer class AnalyticsEngine, and thus why
> Spark is trying to serialize AnalyticsEngine.
>
> In the Scala API, the closure (which is really just implemented as anonymous
> classes) has a field called "$outer", and Spark uses a "closure cleaner"
> that goes into the anonymous class to remove the $outer field if it is not
> used in the closure itself. In Java, the compiler generates a field called
> "this$0", and thus the closure cleaner doesn't find it and can't "clean" it
> properly.
>
> I will work on a fix for the closure cleaner to clean this up as well.
> Meantime, you can work around this by either defining your anonymous class
> as a static class, or mark the JavaSparkContext field as transient.
>
>
>
> On Sun, Nov 3, 2013 at 7:32 PM, Patrick Wendell <pwend...@gmail.com> wrote:
>>
>> Hm, I think you are triggering a bug in the Java API where closures
>> may not be properly cleaned. I think @rxin has reproduced this,
>> deferring to him.
>>
>> - Patrick
>>
>> On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> wrote:
>> > code is below. in the code rdd.count() works, but rdd2.count() fails.
>> >
>> > public class AnalyticsEngine  implements Serializable {
>> >
>> >     private static AnalyticsEngine engine=null;
>> >     private JavaSparkContext sc;
>> >
>> >     final Logger logger =
>> > LoggerFactory.getLogger(AnalyticsEngine.class);
>> >     private Properties prop;
>> >
>> >     String db_host;
>> >
>> >     private AnalyticsEngine()
>> >     {
>> >         System.setProperty("spark.serializer",
>> > "org.apache.spark.serializer.KryoSerializer");
>> >         System.setProperty("spark.kryo.registrator",
>> > "edu.mit.bsense.MyRegistrator");
>> >         sc = new JavaSparkContext("local[4]","TestSpark");
>> >         Properties prop = new Properties();
>> >         try {
>> >             prop.load(new FileInputStream("config.properties"));
>> >
>> >
>> >             db_host = prop.getProperty("database_host1");
>> >             logger.info("Database host: {}", db_host);
>> >         }  catch (FileNotFoundException ex)
>> >                 {
>> >                     logger.info("Could not read config.properties: " +
>> > ex.toString());
>> >
>> >                 } catch (IOException ex)
>> >                 {
>> >                     logger.info("Could not read config.properties: " +
>> > ex.toString());
>> >
>> >                 }
>> >
>> >
>> >
>> >         public void getData(void)
>> >         {
>> >         Configuration conf = new Configuration();
>> >
>> >         String conf_url = "mongodb://" + db_host + "/test.data1"; //this
>> > is
>> > the data partition
>> >         conf.set("mongo.input.uri", conf_url);
>> >
>> >
>> >         conf.set("mongo.input.query",
>> > "{\"streamId\":\""+"13"+"\"},{\"data\":1}");
>> >         conf.set("mongo.input.split_size","64");
>> >
>> >         JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf,
>> > MongoInputFormat.class, Object.class, BSONObject.class);
>> >
>> >         rdd.cache();
>> >
>> >         logger.info("Count of rdd: {}", rdd.count());
>> >
>> >
>> > logger.info("==========================================================================");
>> >
>> >
>> >
>> >         JavaDoubleRDD rdd2 =  rdd.flatMap( new
>> > DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
>> >
>> >         @Override
>> >         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>> >           BSONObject doc = e._2();
>> >           BasicDBList vals = (BasicDBList)doc.get("data");
>> >
>> >           List<Double> results = new ArrayList<Double>();
>> >           for (int i=0; i< vals.size();i++ )
>> > results.add((Double)((BasicDBList)vals.get(i)).get(0));
>> >
>> >           return results;
>> >
>> >         }
>> >         });
>> >
>> >         logger.info("Take: {}", rdd2.take(100));
>> >         logger.info("Count: {}", rdd2.count());
>> >
>> >
>> >     }
>> >
>> >     }
>> >
>> >
>> > On 11/3/13 8:19 PM, Patrick Wendell wrote:
>> >>
>> >> Thanks that would help. This would be consistent with there being a
>> >> reference to the SparkContext itself inside of the closure. Just want
>> >> to make sure that's not the case.
>> >>
>> >> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> >> wrote:
>> >>>
>> >>> Im running in local[4] mode - so there are no slave machines. Full
>> >>> stack
>> >>> trace:
>> >>>
>> >>>
>> >>> (run-main) org.apache.spark.SparkException: Job failed:
>> >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> >>> org.apache.spark.SparkException: Job failed:
>> >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> >>>      at
>> >>>
>> >>>
>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> >>>      at
>> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> >>> [debug]     Thread run-main exited.
>> >>> [debug] Interrupting remaining threads (should be all daemons).
>> >>> [debug] Sandboxed run complete..
>> >>> java.lang.RuntimeException: Nonzero exit code: 1
>> >>>      at scala.sys.package$.error(package.scala:27)
>> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>> >>>      at scala.Option.foreach(Option.scala:236)
>> >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>> >>>      at sbt.Defaults$.toError(Defaults.scala:34)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>> >>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>> >>>      at
>> >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>> >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>> >>>      at
>> >>>
>> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>> >>>      at
>> >>>
>> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>> >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>> >>>      at sbt.Execute.work(Execute.scala:244)
>> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>> >>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>> >>>      at
>> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>      at
>> >>>
>> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>> >>>      at
>> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>      at
>> >>>
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>>      at
>> >>>
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>>      at java.lang.Thread.run(Thread.java:695)
>> >>>
>> >>> when I add implements Serializable to my class, I get the following
>> >>> stack
>> >>> trace:
>> >>>
>> >>> error] (run-main) org.apache.spark.SparkException: Job failed:
>> >>> java.io.NotSerializableException:
>> >>> org.apache.spark.api.java.JavaSparkContext
>> >>> org.apache.spark.SparkException: Job failed:
>> >>> java.io.NotSerializableException:
>> >>> org.apache.spark.api.java.JavaSparkContext
>> >>>
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> >>>      at
>> >>>
>> >>>
>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> >>>      at
>> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> >>> [debug]     Thread run-main exited.
>> >>> [debug] Interrupting remaining threads (should be all daemons).
>> >>> [debug] Sandboxed run complete..
>> >>> java.lang.RuntimeException: Nonzero exit code: 1
>> >>>      at scala.sys.package$.error(package.scala:27)
>> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>> >>>      at scala.Option.foreach(Option.scala:236)
>> >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>> >>>      at sbt.Defaults$.toError(Defaults.scala:34)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>> >>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>> >>>      at
>> >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>> >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>> >>>      at
>> >>>
>> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>> >>>      at
>> >>>
>> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>> >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>> >>>      at sbt.Execute.work(Execute.scala:244)
>> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>> >>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>> >>>      at
>> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>      at
>> >>>
>> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>> >>>      at
>> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>      at
>> >>>
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>>      at
>> >>>
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>>      at java.lang.Thread.run(Thread.java:695)
>> >>>
>> >>> I can post my code if that helps
>> >>>
>> >>>
>> >>>
>> >>> On 11/3/13 8:05 PM, Patrick Wendell wrote:
>> >>>>
>> >>>> If you look in the UI, are there failures on any of the slaves that
>> >>>> you can give a  stack trace for? That would narrow down where the
>> >>>> serialization error is happening.
>> >>>>
>> >>>> Unfortunately this code path doesn't print a full stack trace which
>> >>>> makes it harder to debug where the serialization error comes from.
>> >>>>
>> >>>> Could you post all of your code?
>> >>>>
>> >>>> Also, just wondering, what happens if you just go ahead and add
>> >>>> "extends Serializable" to AnalyticsEngine class? It's possible this
>> >>>> is
>> >>>> happening during closure serialization, which will use the closure
>> >>>> serializer (which is by default Java).
>> >>>>
>> >>>> - Patrick
>> >>>>
>> >>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> >>>> wrote:
>> >>>>>
>> >>>>> yes, I tried that as well (it is currently registered with Kryo)-
>> >>>>> although
>> >>>>> it doesnt make sense to me (and doesnt solve the problem). I also
>> >>>>> made
>> >>>>> sure
>> >>>>> my registration was running:
>> >>>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>> >>>>> registrator: edu.mit.bsense.MyRegistrator
>> >>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>> >>>>> org.apache.spark.serializer.KryoSerializer  - Running user
>> >>>>> registrator:
>> >>>>> edu.mit.bsense.MyRegistrator
>> >>>>>
>> >>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
>> >>>>> instantiates the RDDs and runs the map() and count().
>> >>>>> Can you explain why it needs to be serialized?
>> >>>>>
>> >>>>> Also, when running count() on my original RDD (pre map) I get the
>> >>>>> right
>> >>>>> answer - this means the classes of data in the RDD are serializable.
>> >>>>> It's only when I run map, and then count() on a new RDD do I get
>> >>>>> this
>> >>>>> exception. My map does not introduce any new classes it - just
>> >>>>> iterates
>> >>>>> over
>> >>>>> the existing data.
>> >>>>>
>> >>>>> Any ideas?
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>> >>>>>>
>> >>>>>> edu.mit.bsense.AnalyticsEngine
>> >>>>>>
>> >>>>>> Look at the exception. Basically, you'll need to register every
>> >>>>>> class
>> >>>>>> type that is recursively used by BSONObject.
>> >>>>>>
>> >>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg
>> >>>>>> <ya...@media.mit.edu>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>> Hi Patrick,
>> >>>>>>>
>> >>>>>>> I am in fact using Kryo and im registering  BSONObject.class
>> >>>>>>> (which
>> >>>>>>> is
>> >>>>>>> class
>> >>>>>>> holding the data) in my KryoRegistrator.
>> >>>>>>> Im not sure what other classes I should be registering.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>>
>> >>>>>>> Yadid
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>> >>>>>>>>
>> >>>>>>>> The problem is you are referencing a class that does not "extend
>> >>>>>>>> serializable" in the data that you shuffle. Spark needs to send
>> >>>>>>>> all
>> >>>>>>>> shuffle data over the network, so it needs to know how to
>> >>>>>>>> serialize
>> >>>>>>>> them.
>> >>>>>>>>
>> >>>>>>>> One option is to use Kryo for network serialization as described
>> >>>>>>>> here
>> >>>>>>>> - you'll have to register all the class that get serialized
>> >>>>>>>> though.
>> >>>>>>>>
>> >>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>> >>>>>>>>
>> >>>>>>>> Another option is to write a wrapper class that "extends
>> >>>>>>>> externalizable" and write the serialization yourself.
>> >>>>>>>>
>> >>>>>>>> - Patrick
>> >>>>>>>>
>> >>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg
>> >>>>>>>> <ya...@media.mit.edu>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi All,
>> >>>>>>>>>
>> >>>>>>>>> My original RDD contains arrays of doubles. when appying a
>> >>>>>>>>> count()
>> >>>>>>>>> operator
>> >>>>>>>>> to the original RDD I get the result as expected.
>> >>>>>>>>> However when I run a map on the original RDD in order to
>> >>>>>>>>> generate a
>> >>>>>>>>> new
>> >>>>>>>>> RDD
>> >>>>>>>>> with only the first element of each array, and try to apply
>> >>>>>>>>> count()
>> >>>>>>>>> to
>> >>>>>>>>> the
>> >>>>>>>>> new generated RDD I get the following exception:
>> >>>>>>>>>
>> >>>>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler
>> >>>>>>>>> -
>> >>>>>>>>> Failed
>> >>>>>>>>> to
>> >>>>>>>>> run count at AnalyticsEngine.java:133
>> >>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>> >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> >>>>>>>>> org.apache.spark.SparkException: Job failed:
>> >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> If a run a take() operation on the new RDD I receive the results
>> >>>>>>>>> as
>> >>>>>>>>> expected. here is my code:
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
>> >>>>>>>>> FlatMapFunction<Tuple2<Object,
>> >>>>>>>>> BSONObject>, Double>() {
>> >>>>>>>>>             @Override
>> >>>>>>>>>             public Iterable<Double> call(Tuple2<Object,
>> >>>>>>>>> BSONObject>
>> >>>>>>>>> e)
>> >>>>>>>>> {
>> >>>>>>>>>               BSONObject doc = e._2();
>> >>>>>>>>>               List<List<Double>> vals =
>> >>>>>>>>> (List<List<Double>>)doc.get("data");
>> >>>>>>>>>               List<Double> results = new ArrayList<Double>();
>> >>>>>>>>>               for (int i=0; i< vals.size();i++ )
>> >>>>>>>>>                   results.add((Double)vals.get(i).get(0));
>> >>>>>>>>>               return results;
>> >>>>>>>>>
>> >>>>>>>>>             }
>> >>>>>>>>>             });
>> >>>>>>>>>
>> >>>>>>>>>             logger.info("Take: {}", rdd2.take(100));
>> >>>>>>>>>             logger.info("Count: {}", rdd2.count());
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Any ideas on what I am doing wrong ?
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>>
>> >>>>>>>>> Yadid
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> --
>> >>>>>>>>> Yadid Ayzenberg
>> >>>>>>>>> Graduate Student and Research Assistant
>> >>>>>>>>> Affective Computing
>> >>>>>>>>> Phone: 617-866-7226
>> >>>>>>>>> Room: E14-274G
>> >>>>>>>>> MIT Media Lab
>> >>>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>> --
>> >>>>>>> Yadid Ayzenberg
>> >>>>>>> Graduate Student and Research Assistant
>> >>>>>>> Affective Computing
>> >>>>>>> Phone: 617-866-7226
>> >>>>>>> Room: E14-274G
>> >>>>>>> MIT Media Lab
>> >>>>>>> 75 Amherst st, Cambridge, MA, 02139
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>> --
>> >>>>> Yadid Ayzenberg
>> >>>>> Graduate Student and Research Assistant
>> >>>>> Affective Computing
>> >>>>> Phone: 617-866-7226
>> >>>>> Room: E14-274G
>> >>>>> MIT Media Lab
>> >>>>> 75 Amherst st, Cambridge, MA, 02139
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >>> --
>> >>> Yadid Ayzenberg
>> >>> Graduate Student and Research Assistant
>> >>> Affective Computing
>> >>> Phone: 617-866-7226
>> >>> Room: E14-274G
>> >>> MIT Media Lab
>> >>> 75 Amherst st, Cambridge, MA, 02139
>> >>>
>> >>>
>> >>>
>> >
>> >
>> > --
>> > Yadid Ayzenberg
>> > Graduate Student and Research Assistant
>> > Affective Computing
>> > Phone: 617-866-7226
>> > Room: E14-274G
>> > MIT Media Lab
>> > 75 Amherst st, Cambridge, MA, 02139
>> >
>> >
>> >
>
>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Reply via email to