Hm, I think you are triggering a bug in the Java API where closures may not be properly cleaned. I think @rxin has reproduced this, deferring to him.
- Patrick On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg <[email protected]> wrote: > code is below. in the code rdd.count() works, but rdd2.count() fails. > > public class AnalyticsEngine implements Serializable { > > private static AnalyticsEngine engine=null; > private JavaSparkContext sc; > > final Logger logger = LoggerFactory.getLogger(AnalyticsEngine.class); > private Properties prop; > > String db_host; > > private AnalyticsEngine() > { > System.setProperty("spark.serializer", > "org.apache.spark.serializer.KryoSerializer"); > System.setProperty("spark.kryo.registrator", > "edu.mit.bsense.MyRegistrator"); > sc = new JavaSparkContext("local[4]","TestSpark"); > Properties prop = new Properties(); > try { > prop.load(new FileInputStream("config.properties")); > > > db_host = prop.getProperty("database_host1"); > logger.info("Database host: {}", db_host); > } catch (FileNotFoundException ex) > { > logger.info("Could not read config.properties: " + > ex.toString()); > > } catch (IOException ex) > { > logger.info("Could not read config.properties: " + > ex.toString()); > > } > > > > public void getData(void) > { > Configuration conf = new Configuration(); > > String conf_url = "mongodb://" + db_host + "/test.data1"; //this is > the data partition > conf.set("mongo.input.uri", conf_url); > > > conf.set("mongo.input.query", > "{\"streamId\":\""+"13"+"\"},{\"data\":1}"); > conf.set("mongo.input.split_size","64"); > > JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf, > MongoInputFormat.class, Object.class, BSONObject.class); > > rdd.cache(); > > logger.info("Count of rdd: {}", rdd.count()); > > logger.info("=========================================================================="); > > > > JavaDoubleRDD rdd2 = rdd.flatMap( new > DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() { > > @Override > public Iterable<Double> call(Tuple2<Object, BSONObject> e) { > BSONObject doc = e._2(); > BasicDBList vals = (BasicDBList)doc.get("data"); > > List<Double> results = new ArrayList<Double>(); > for (int i=0; i< vals.size();i++ ) > results.add((Double)((BasicDBList)vals.get(i)).get(0)); > > return results; > > } > }); > > logger.info("Take: {}", rdd2.take(100)); > logger.info("Count: {}", rdd2.count()); > > > } > > } > > > On 11/3/13 8:19 PM, Patrick Wendell wrote: >> >> Thanks that would help. This would be consistent with there being a >> reference to the SparkContext itself inside of the closure. Just want >> to make sure that's not the case. >> >> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <[email protected]> >> wrote: >>> >>> Im running in local[4] mode - so there are no slave machines. Full stack >>> trace: >>> >>> >>> (run-main) org.apache.spark.SparkException: Job failed: >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >>> org.apache.spark.SparkException: Job failed: >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >>> at >>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>> [debug] Thread run-main exited. >>> [debug] Interrupting remaining threads (should be all daemons). >>> [debug] Sandboxed run complete.. >>> java.lang.RuntimeException: Nonzero exit code: 1 >>> at scala.sys.package$.error(package.scala:27) >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) >>> at scala.Option.foreach(Option.scala:236) >>> at sbt.BuildCommon$class.toError(Defaults.scala:1628) >>> at sbt.Defaults$.toError(Defaults.scala:34) >>> at >>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647) >>> at >>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645) >>> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) >>> at >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42) >>> at sbt.std.Transform$$anon$4.work(System.scala:64) >>> at >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) >>> at >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) >>> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18) >>> at sbt.Execute.work(Execute.scala:244) >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) >>> at >>> >>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160) >>> at sbt.CompletionService$$anon$2.call(CompletionService.scala:30) >>> at >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) >>> at >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >>> at java.lang.Thread.run(Thread.java:695) >>> >>> when I add implements Serializable to my class, I get the following stack >>> trace: >>> >>> error] (run-main) org.apache.spark.SparkException: Job failed: >>> java.io.NotSerializableException: >>> org.apache.spark.api.java.JavaSparkContext >>> org.apache.spark.SparkException: Job failed: >>> java.io.NotSerializableException: >>> org.apache.spark.api.java.JavaSparkContext >>> >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >>> at >>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>> at >>> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>> [debug] Thread run-main exited. >>> [debug] Interrupting remaining threads (should be all daemons). >>> [debug] Sandboxed run complete.. >>> java.lang.RuntimeException: Nonzero exit code: 1 >>> at scala.sys.package$.error(package.scala:27) >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) >>> at scala.Option.foreach(Option.scala:236) >>> at sbt.BuildCommon$class.toError(Defaults.scala:1628) >>> at sbt.Defaults$.toError(Defaults.scala:34) >>> at >>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647) >>> at >>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645) >>> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) >>> at >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42) >>> at sbt.std.Transform$$anon$4.work(System.scala:64) >>> at >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) >>> at >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) >>> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18) >>> at sbt.Execute.work(Execute.scala:244) >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) >>> at >>> >>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160) >>> at sbt.CompletionService$$anon$2.call(CompletionService.scala:30) >>> at >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) >>> at >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >>> at java.lang.Thread.run(Thread.java:695) >>> >>> I can post my code if that helps >>> >>> >>> >>> On 11/3/13 8:05 PM, Patrick Wendell wrote: >>>> >>>> If you look in the UI, are there failures on any of the slaves that >>>> you can give a stack trace for? That would narrow down where the >>>> serialization error is happening. >>>> >>>> Unfortunately this code path doesn't print a full stack trace which >>>> makes it harder to debug where the serialization error comes from. >>>> >>>> Could you post all of your code? >>>> >>>> Also, just wondering, what happens if you just go ahead and add >>>> "extends Serializable" to AnalyticsEngine class? It's possible this is >>>> happening during closure serialization, which will use the closure >>>> serializer (which is by default Java). >>>> >>>> - Patrick >>>> >>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <[email protected]> >>>> wrote: >>>>> >>>>> yes, I tried that as well (it is currently registered with Kryo)- >>>>> although >>>>> it doesnt make sense to me (and doesnt solve the problem). I also made >>>>> sure >>>>> my registration was running: >>>>> DEBUG org.apache.spark.serializer.KryoSerializer - Running user >>>>> registrator: edu.mit.bsense.MyRegistrator >>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG >>>>> org.apache.spark.serializer.KryoSerializer - Running user registrator: >>>>> edu.mit.bsense.MyRegistrator >>>>> >>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which >>>>> instantiates the RDDs and runs the map() and count(). >>>>> Can you explain why it needs to be serialized? >>>>> >>>>> Also, when running count() on my original RDD (pre map) I get the right >>>>> answer - this means the classes of data in the RDD are serializable. >>>>> It's only when I run map, and then count() on a new RDD do I get this >>>>> exception. My map does not introduce any new classes it - just iterates >>>>> over >>>>> the existing data. >>>>> >>>>> Any ideas? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote: >>>>>> >>>>>> edu.mit.bsense.AnalyticsEngine >>>>>> >>>>>> Look at the exception. Basically, you'll need to register every class >>>>>> type that is recursively used by BSONObject. >>>>>> >>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <[email protected]> >>>>>> wrote: >>>>>>> >>>>>>> Hi Patrick, >>>>>>> >>>>>>> I am in fact using Kryo and im registering BSONObject.class (which >>>>>>> is >>>>>>> class >>>>>>> holding the data) in my KryoRegistrator. >>>>>>> Im not sure what other classes I should be registering. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Yadid >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote: >>>>>>>> >>>>>>>> The problem is you are referencing a class that does not "extend >>>>>>>> serializable" in the data that you shuffle. Spark needs to send all >>>>>>>> shuffle data over the network, so it needs to know how to serialize >>>>>>>> them. >>>>>>>> >>>>>>>> One option is to use Kryo for network serialization as described >>>>>>>> here >>>>>>>> - you'll have to register all the class that get serialized though. >>>>>>>> >>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html >>>>>>>> >>>>>>>> Another option is to write a wrapper class that "extends >>>>>>>> externalizable" and write the serialization yourself. >>>>>>>> >>>>>>>> - Patrick >>>>>>>> >>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg >>>>>>>> <[email protected]> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> My original RDD contains arrays of doubles. when appying a count() >>>>>>>>> operator >>>>>>>>> to the original RDD I get the result as expected. >>>>>>>>> However when I run a map on the original RDD in order to generate a >>>>>>>>> new >>>>>>>>> RDD >>>>>>>>> with only the first element of each array, and try to apply count() >>>>>>>>> to >>>>>>>>> the >>>>>>>>> new generated RDD I get the following exception: >>>>>>>>> >>>>>>>>> 19829 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - >>>>>>>>> Failed >>>>>>>>> to >>>>>>>>> run count at AnalyticsEngine.java:133 >>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed: >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >>>>>>>>> org.apache.spark.SparkException: Job failed: >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >>>>>>>>> at >>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>>>>>>>> at >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>>>>>>>> >>>>>>>>> >>>>>>>>> If a run a take() operation on the new RDD I receive the results as >>>>>>>>> expected. here is my code: >>>>>>>>> >>>>>>>>> >>>>>>>>> JavaRDD<Double> rdd2 = rdd.flatMap( new >>>>>>>>> FlatMapFunction<Tuple2<Object, >>>>>>>>> BSONObject>, Double>() { >>>>>>>>> @Override >>>>>>>>> public Iterable<Double> call(Tuple2<Object, BSONObject> >>>>>>>>> e) >>>>>>>>> { >>>>>>>>> BSONObject doc = e._2(); >>>>>>>>> List<List<Double>> vals = >>>>>>>>> (List<List<Double>>)doc.get("data"); >>>>>>>>> List<Double> results = new ArrayList<Double>(); >>>>>>>>> for (int i=0; i< vals.size();i++ ) >>>>>>>>> results.add((Double)vals.get(i).get(0)); >>>>>>>>> return results; >>>>>>>>> >>>>>>>>> } >>>>>>>>> }); >>>>>>>>> >>>>>>>>> logger.info("Take: {}", rdd2.take(100)); >>>>>>>>> logger.info("Count: {}", rdd2.count()); >>>>>>>>> >>>>>>>>> >>>>>>>>> Any ideas on what I am doing wrong ? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Yadid >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Yadid Ayzenberg >>>>>>>>> Graduate Student and Research Assistant >>>>>>>>> Affective Computing >>>>>>>>> Phone: 617-866-7226 >>>>>>>>> Room: E14-274G >>>>>>>>> MIT Media Lab >>>>>>>>> 75 Amherst st, Cambridge, MA, 02139 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> -- >>>>>>> Yadid Ayzenberg >>>>>>> Graduate Student and Research Assistant >>>>>>> Affective Computing >>>>>>> Phone: 617-866-7226 >>>>>>> Room: E14-274G >>>>>>> MIT Media Lab >>>>>>> 75 Amherst st, Cambridge, MA, 02139 >>>>>>> >>>>>>> >>>>>>> >>>>> -- >>>>> Yadid Ayzenberg >>>>> Graduate Student and Research Assistant >>>>> Affective Computing >>>>> Phone: 617-866-7226 >>>>> Room: E14-274G >>>>> MIT Media Lab >>>>> 75 Amherst st, Cambridge, MA, 02139 >>>>> >>>>> >>>>> >>> >>> -- >>> Yadid Ayzenberg >>> Graduate Student and Research Assistant >>> Affective Computing >>> Phone: 617-866-7226 >>> Room: E14-274G >>> MIT Media Lab >>> 75 Amherst st, Cambridge, MA, 02139 >>> >>> >>> > > > -- > Yadid Ayzenberg > Graduate Student and Research Assistant > Affective Computing > Phone: 617-866-7226 > Room: E14-274G > MIT Media Lab > 75 Amherst st, Cambridge, MA, 02139 > > >
