Thanks that would help. This would be consistent with there being a reference to the SparkContext itself inside of the closure. Just want to make sure that's not the case.
On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <[email protected]> wrote: > Im running in local[4] mode - so there are no slave machines. Full stack > trace: > > > (run-main) org.apache.spark.SparkException: Job failed: > java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine > org.apache.spark.SparkException: Job failed: > java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > at > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > [debug] Thread run-main exited. > [debug] Interrupting remaining threads (should be all daemons). > [debug] Sandboxed run complete.. > java.lang.RuntimeException: Nonzero exit code: 1 > at scala.sys.package$.error(package.scala:27) > at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) > at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) > at scala.Option.foreach(Option.scala:236) > at sbt.BuildCommon$class.toError(Defaults.scala:1628) > at sbt.Defaults$.toError(Defaults.scala:34) > at > sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647) > at > sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645) > at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) > at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42) > at sbt.std.Transform$$anon$4.work(System.scala:64) > at > sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) > at > sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) > at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18) > at sbt.Execute.work(Execute.scala:244) > at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) > at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) > at > sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160) > at sbt.CompletionService$$anon$2.call(CompletionService.scala:30) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > at java.lang.Thread.run(Thread.java:695) > > when I add implements Serializable to my class, I get the following stack > trace: > > error] (run-main) org.apache.spark.SparkException: Job failed: > java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext > org.apache.spark.SparkException: Job failed: > java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > at > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > [debug] Thread run-main exited. > [debug] Interrupting remaining threads (should be all daemons). > [debug] Sandboxed run complete.. > java.lang.RuntimeException: Nonzero exit code: 1 > at scala.sys.package$.error(package.scala:27) > at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) > at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) > at scala.Option.foreach(Option.scala:236) > at sbt.BuildCommon$class.toError(Defaults.scala:1628) > at sbt.Defaults$.toError(Defaults.scala:34) > at > sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647) > at > sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645) > at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) > at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42) > at sbt.std.Transform$$anon$4.work(System.scala:64) > at > sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) > at > sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) > at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18) > at sbt.Execute.work(Execute.scala:244) > at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) > at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) > at > sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160) > at sbt.CompletionService$$anon$2.call(CompletionService.scala:30) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > at java.lang.Thread.run(Thread.java:695) > > I can post my code if that helps > > > > On 11/3/13 8:05 PM, Patrick Wendell wrote: >> >> If you look in the UI, are there failures on any of the slaves that >> you can give a stack trace for? That would narrow down where the >> serialization error is happening. >> >> Unfortunately this code path doesn't print a full stack trace which >> makes it harder to debug where the serialization error comes from. >> >> Could you post all of your code? >> >> Also, just wondering, what happens if you just go ahead and add >> "extends Serializable" to AnalyticsEngine class? It's possible this is >> happening during closure serialization, which will use the closure >> serializer (which is by default Java). >> >> - Patrick >> >> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <[email protected]> >> wrote: >>> >>> yes, I tried that as well (it is currently registered with Kryo)- >>> although >>> it doesnt make sense to me (and doesnt solve the problem). I also made >>> sure >>> my registration was running: >>> DEBUG org.apache.spark.serializer.KryoSerializer - Running user >>> registrator: edu.mit.bsense.MyRegistrator >>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG >>> org.apache.spark.serializer.KryoSerializer - Running user registrator: >>> edu.mit.bsense.MyRegistrator >>> >>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which >>> instantiates the RDDs and runs the map() and count(). >>> Can you explain why it needs to be serialized? >>> >>> Also, when running count() on my original RDD (pre map) I get the right >>> answer - this means the classes of data in the RDD are serializable. >>> It's only when I run map, and then count() on a new RDD do I get this >>> exception. My map does not introduce any new classes it - just iterates >>> over >>> the existing data. >>> >>> Any ideas? >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On 11/3/13 7:43 PM, Patrick Wendell wrote: >>>> >>>> edu.mit.bsense.AnalyticsEngine >>>> >>>> Look at the exception. Basically, you'll need to register every class >>>> type that is recursively used by BSONObject. >>>> >>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <[email protected]> >>>> wrote: >>>>> >>>>> Hi Patrick, >>>>> >>>>> I am in fact using Kryo and im registering BSONObject.class (which is >>>>> class >>>>> holding the data) in my KryoRegistrator. >>>>> Im not sure what other classes I should be registering. >>>>> >>>>> Thanks, >>>>> >>>>> Yadid >>>>> >>>>> >>>>> >>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote: >>>>>> >>>>>> The problem is you are referencing a class that does not "extend >>>>>> serializable" in the data that you shuffle. Spark needs to send all >>>>>> shuffle data over the network, so it needs to know how to serialize >>>>>> them. >>>>>> >>>>>> One option is to use Kryo for network serialization as described here >>>>>> - you'll have to register all the class that get serialized though. >>>>>> >>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html >>>>>> >>>>>> Another option is to write a wrapper class that "extends >>>>>> externalizable" and write the serialization yourself. >>>>>> >>>>>> - Patrick >>>>>> >>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <[email protected]> >>>>>> wrote: >>>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> My original RDD contains arrays of doubles. when appying a count() >>>>>>> operator >>>>>>> to the original RDD I get the result as expected. >>>>>>> However when I run a map on the original RDD in order to generate a >>>>>>> new >>>>>>> RDD >>>>>>> with only the first element of each array, and try to apply count() >>>>>>> to >>>>>>> the >>>>>>> new generated RDD I get the following exception: >>>>>>> >>>>>>> 19829 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - >>>>>>> Failed >>>>>>> to >>>>>>> run count at AnalyticsEngine.java:133 >>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed: >>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >>>>>>> org.apache.spark.SparkException: Job failed: >>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>>>>>> >>>>>>> >>>>>>> If a run a take() operation on the new RDD I receive the results as >>>>>>> expected. here is my code: >>>>>>> >>>>>>> >>>>>>> JavaRDD<Double> rdd2 = rdd.flatMap( new >>>>>>> FlatMapFunction<Tuple2<Object, >>>>>>> BSONObject>, Double>() { >>>>>>> @Override >>>>>>> public Iterable<Double> call(Tuple2<Object, BSONObject> e) >>>>>>> { >>>>>>> BSONObject doc = e._2(); >>>>>>> List<List<Double>> vals = >>>>>>> (List<List<Double>>)doc.get("data"); >>>>>>> List<Double> results = new ArrayList<Double>(); >>>>>>> for (int i=0; i< vals.size();i++ ) >>>>>>> results.add((Double)vals.get(i).get(0)); >>>>>>> return results; >>>>>>> >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> logger.info("Take: {}", rdd2.take(100)); >>>>>>> logger.info("Count: {}", rdd2.count()); >>>>>>> >>>>>>> >>>>>>> Any ideas on what I am doing wrong ? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Yadid >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Yadid Ayzenberg >>>>>>> Graduate Student and Research Assistant >>>>>>> Affective Computing >>>>>>> Phone: 617-866-7226 >>>>>>> Room: E14-274G >>>>>>> MIT Media Lab >>>>>>> 75 Amherst st, Cambridge, MA, 02139 >>>>>>> >>>>>>> >>>>>>> >>>>> -- >>>>> Yadid Ayzenberg >>>>> Graduate Student and Research Assistant >>>>> Affective Computing >>>>> Phone: 617-866-7226 >>>>> Room: E14-274G >>>>> MIT Media Lab >>>>> 75 Amherst st, Cambridge, MA, 02139 >>>>> >>>>> >>>>> >>> >>> -- >>> Yadid Ayzenberg >>> Graduate Student and Research Assistant >>> Affective Computing >>> Phone: 617-866-7226 >>> Room: E14-274G >>> MIT Media Lab >>> 75 Amherst st, Cambridge, MA, 02139 >>> >>> >>> > > > -- > Yadid Ayzenberg > Graduate Student and Research Assistant > Affective Computing > Phone: 617-866-7226 > Room: E14-274G > MIT Media Lab > 75 Amherst st, Cambridge, MA, 02139 > > >
