Hm, I think you are triggering a bug in the Java API where closures
may not be properly cleaned. I think @rxin has reproduced this,
deferring to him.

- Patrick

On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg <[email protected]> wrote:
> code is below. in the code rdd.count() works, but rdd2.count() fails.
>
> public class AnalyticsEngine  implements Serializable {
>
>     private static AnalyticsEngine engine=null;
>     private JavaSparkContext sc;
>
>     final Logger logger = LoggerFactory.getLogger(AnalyticsEngine.class);
>     private Properties prop;
>
>     String db_host;
>
>     private AnalyticsEngine()
>     {
>         System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
>         System.setProperty("spark.kryo.registrator",
> "edu.mit.bsense.MyRegistrator");
>         sc = new JavaSparkContext("local[4]","TestSpark");
>         Properties prop = new Properties();
>         try {
>             prop.load(new FileInputStream("config.properties"));
>
>
>             db_host = prop.getProperty("database_host1");
>             logger.info("Database host: {}", db_host);
>         }  catch (FileNotFoundException ex)
>                 {
>                     logger.info("Could not read config.properties: " +
> ex.toString());
>
>                 } catch (IOException ex)
>                 {
>                     logger.info("Could not read config.properties: " +
> ex.toString());
>
>                 }
>
>
>
>         public void getData(void)
>         {
>         Configuration conf = new Configuration();
>
>         String conf_url = "mongodb://" + db_host + "/test.data1"; //this is
> the data partition
>         conf.set("mongo.input.uri", conf_url);
>
>
>         conf.set("mongo.input.query",
> "{\"streamId\":\""+"13"+"\"},{\"data\":1}");
>         conf.set("mongo.input.split_size","64");
>
>         JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf,
> MongoInputFormat.class, Object.class, BSONObject.class);
>
>         rdd.cache();
>
>         logger.info("Count of rdd: {}", rdd.count());
>
> logger.info("==========================================================================");
>
>
>
>         JavaDoubleRDD rdd2 =  rdd.flatMap( new
> DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
>
>         @Override
>         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>           BSONObject doc = e._2();
>           BasicDBList vals = (BasicDBList)doc.get("data");
>
>           List<Double> results = new ArrayList<Double>();
>           for (int i=0; i< vals.size();i++ )
> results.add((Double)((BasicDBList)vals.get(i)).get(0));
>
>           return results;
>
>         }
>         });
>
>         logger.info("Take: {}", rdd2.take(100));
>         logger.info("Count: {}", rdd2.count());
>
>
>     }
>
>     }
>
>
> On 11/3/13 8:19 PM, Patrick Wendell wrote:
>>
>> Thanks that would help. This would be consistent with there being a
>> reference to the SparkContext itself inside of the closure. Just want
>> to make sure that's not the case.
>>
>> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <[email protected]>
>> wrote:
>>>
>>> Im running in local[4] mode - so there are no slave machines. Full stack
>>> trace:
>>>
>>>
>>> (run-main) org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>> org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>      at
>>>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>      at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>> [debug]     Thread run-main exited.
>>> [debug] Interrupting remaining threads (should be all daemons).
>>> [debug] Sandboxed run complete..
>>> java.lang.RuntimeException: Nonzero exit code: 1
>>>      at scala.sys.package$.error(package.scala:27)
>>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>>      at scala.Option.foreach(Option.scala:236)
>>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>>>      at sbt.Defaults$.toError(Defaults.scala:34)
>>>      at
>>>
>>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>>>      at
>>>
>>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>>>      at
>>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>>>      at
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>>      at
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>>>      at sbt.Execute.work(Execute.scala:244)
>>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>>      at
>>>
>>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>>>      at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>      at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>>>      at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>      at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>      at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>>      at java.lang.Thread.run(Thread.java:695)
>>>
>>> when I add implements Serializable to my class, I get the following stack
>>> trace:
>>>
>>> error] (run-main) org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException:
>>> org.apache.spark.api.java.JavaSparkContext
>>> org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException:
>>> org.apache.spark.api.java.JavaSparkContext
>>>
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>      at
>>>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>      at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>> [debug]     Thread run-main exited.
>>> [debug] Interrupting remaining threads (should be all daemons).
>>> [debug] Sandboxed run complete..
>>> java.lang.RuntimeException: Nonzero exit code: 1
>>>      at scala.sys.package$.error(package.scala:27)
>>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>>      at scala.Option.foreach(Option.scala:236)
>>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>>>      at sbt.Defaults$.toError(Defaults.scala:34)
>>>      at
>>>
>>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>>>      at
>>>
>>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>>>      at
>>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>>>      at
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>>      at
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>>>      at sbt.Execute.work(Execute.scala:244)
>>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>>      at
>>>
>>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>>>      at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>      at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>>>      at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>      at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>      at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>>      at java.lang.Thread.run(Thread.java:695)
>>>
>>> I can post my code if that helps
>>>
>>>
>>>
>>> On 11/3/13 8:05 PM, Patrick Wendell wrote:
>>>>
>>>> If you look in the UI, are there failures on any of the slaves that
>>>> you can give a  stack trace for? That would narrow down where the
>>>> serialization error is happening.
>>>>
>>>> Unfortunately this code path doesn't print a full stack trace which
>>>> makes it harder to debug where the serialization error comes from.
>>>>
>>>> Could you post all of your code?
>>>>
>>>> Also, just wondering, what happens if you just go ahead and add
>>>> "extends Serializable" to AnalyticsEngine class? It's possible this is
>>>> happening during closure serialization, which will use the closure
>>>> serializer (which is by default Java).
>>>>
>>>> - Patrick
>>>>
>>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <[email protected]>
>>>> wrote:
>>>>>
>>>>> yes, I tried that as well (it is currently registered with Kryo)-
>>>>> although
>>>>> it doesnt make sense to me (and doesnt solve the problem). I also made
>>>>> sure
>>>>> my registration was running:
>>>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>>>>> registrator: edu.mit.bsense.MyRegistrator
>>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>>>>> org.apache.spark.serializer.KryoSerializer  - Running user registrator:
>>>>> edu.mit.bsense.MyRegistrator
>>>>>
>>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
>>>>> instantiates the RDDs and runs the map() and count().
>>>>> Can you explain why it needs to be serialized?
>>>>>
>>>>> Also, when running count() on my original RDD (pre map) I get the right
>>>>> answer - this means the classes of data in the RDD are serializable.
>>>>> It's only when I run map, and then count() on a new RDD do I get this
>>>>> exception. My map does not introduce any new classes it - just iterates
>>>>> over
>>>>> the existing data.
>>>>>
>>>>> Any ideas?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>>>>>>
>>>>>> edu.mit.bsense.AnalyticsEngine
>>>>>>
>>>>>> Look at the exception. Basically, you'll need to register every class
>>>>>> type that is recursively used by BSONObject.
>>>>>>
>>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <[email protected]>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Patrick,
>>>>>>>
>>>>>>> I am in fact using Kryo and im registering  BSONObject.class (which
>>>>>>> is
>>>>>>> class
>>>>>>> holding the data) in my KryoRegistrator.
>>>>>>> Im not sure what other classes I should be registering.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Yadid
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>>>>>>>
>>>>>>>> The problem is you are referencing a class that does not "extend
>>>>>>>> serializable" in the data that you shuffle. Spark needs to send all
>>>>>>>> shuffle data over the network, so it needs to know how to serialize
>>>>>>>> them.
>>>>>>>>
>>>>>>>> One option is to use Kryo for network serialization as described
>>>>>>>> here
>>>>>>>> - you'll have to register all the class that get serialized though.
>>>>>>>>
>>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>>>>>>
>>>>>>>> Another option is to write a wrapper class that "extends
>>>>>>>> externalizable" and write the serialization yourself.
>>>>>>>>
>>>>>>>> - Patrick
>>>>>>>>
>>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg
>>>>>>>> <[email protected]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> My original RDD contains arrays of doubles. when appying a count()
>>>>>>>>> operator
>>>>>>>>> to the original RDD I get the result as expected.
>>>>>>>>> However when I run a map on the original RDD in order to generate a
>>>>>>>>> new
>>>>>>>>> RDD
>>>>>>>>> with only the first element of each array, and try to apply count()
>>>>>>>>> to
>>>>>>>>> the
>>>>>>>>> new generated RDD I get the following exception:
>>>>>>>>>
>>>>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
>>>>>>>>> Failed
>>>>>>>>> to
>>>>>>>>> run count at AnalyticsEngine.java:133
>>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>>>> org.apache.spark.SparkException: Job failed:
>>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>>>>>>         at
>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> If a run a take() operation on the new RDD I receive the results as
>>>>>>>>> expected. here is my code:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
>>>>>>>>> FlatMapFunction<Tuple2<Object,
>>>>>>>>> BSONObject>, Double>() {
>>>>>>>>>             @Override
>>>>>>>>>             public Iterable<Double> call(Tuple2<Object, BSONObject>
>>>>>>>>> e)
>>>>>>>>> {
>>>>>>>>>               BSONObject doc = e._2();
>>>>>>>>>               List<List<Double>> vals =
>>>>>>>>> (List<List<Double>>)doc.get("data");
>>>>>>>>>               List<Double> results = new ArrayList<Double>();
>>>>>>>>>               for (int i=0; i< vals.size();i++ )
>>>>>>>>>                   results.add((Double)vals.get(i).get(0));
>>>>>>>>>               return results;
>>>>>>>>>
>>>>>>>>>             }
>>>>>>>>>             });
>>>>>>>>>
>>>>>>>>>             logger.info("Take: {}", rdd2.take(100));
>>>>>>>>>             logger.info("Count: {}", rdd2.count());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any ideas on what I am doing wrong ?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Yadid
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Yadid Ayzenberg
>>>>>>>>> Graduate Student and Research Assistant
>>>>>>>>> Affective Computing
>>>>>>>>> Phone: 617-866-7226
>>>>>>>>> Room: E14-274G
>>>>>>>>> MIT Media Lab
>>>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>> --
>>>>>>> Yadid Ayzenberg
>>>>>>> Graduate Student and Research Assistant
>>>>>>> Affective Computing
>>>>>>> Phone: 617-866-7226
>>>>>>> Room: E14-274G
>>>>>>> MIT Media Lab
>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>>
>>>>>>>
>>>>>>>
>>>>> --
>>>>> Yadid Ayzenberg
>>>>> Graduate Student and Research Assistant
>>>>> Affective Computing
>>>>> Phone: 617-866-7226
>>>>> Room: E14-274G
>>>>> MIT Media Lab
>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Yadid Ayzenberg
>>> Graduate Student and Research Assistant
>>> Affective Computing
>>> Phone: 617-866-7226
>>> Room: E14-274G
>>> MIT Media Lab
>>> 75 Amherst st, Cambridge, MA, 02139
>>>
>>>
>>>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Reply via email to