If you look in the UI, are there failures on any of the slaves that
you can give a  stack trace for? That would narrow down where the
serialization error is happening.

Unfortunately this code path doesn't print a full stack trace which
makes it harder to debug where the serialization error comes from.

Could you post all of your code?

Also, just wondering, what happens if you just go ahead and add
"extends Serializable" to AnalyticsEngine class? It's possible this is
happening during closure serialization, which will use the closure
serializer (which is by default Java).

- Patrick

On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <[email protected]> wrote:
> yes, I tried that as well (it is currently registered with Kryo)- although
> it doesnt make sense to me (and doesnt solve the problem). I also made sure
> my registration was running:
> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
> registrator: edu.mit.bsense.MyRegistrator
> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
> org.apache.spark.serializer.KryoSerializer  - Running user registrator:
> edu.mit.bsense.MyRegistrator
>
> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
> instantiates the RDDs and runs the map() and count().
> Can you explain why it needs to be serialized?
>
> Also, when running count() on my original RDD (pre map) I get the right
> answer - this means the classes of data in the RDD are serializable.
> It's only when I run map, and then count() on a new RDD do I get this
> exception. My map does not introduce any new classes it - just iterates over
> the existing data.
>
> Any ideas?
>
>
>
>
>
>
>
>
>
> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>>
>> edu.mit.bsense.AnalyticsEngine
>>
>> Look at the exception. Basically, you'll need to register every class
>> type that is recursively used by BSONObject.
>>
>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <[email protected]>
>> wrote:
>>>
>>> Hi Patrick,
>>>
>>> I am in fact using Kryo and im registering  BSONObject.class (which is
>>> class
>>> holding the data) in my KryoRegistrator.
>>> Im not sure what other classes I should be registering.
>>>
>>> Thanks,
>>>
>>> Yadid
>>>
>>>
>>>
>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>>>
>>>> The problem is you are referencing a class that does not "extend
>>>> serializable" in the data that you shuffle. Spark needs to send all
>>>> shuffle data over the network, so it needs to know how to serialize
>>>> them.
>>>>
>>>> One option is to use Kryo for network serialization as described here
>>>> - you'll have to register all the class that get serialized though.
>>>>
>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>>
>>>> Another option is to write a wrapper class that "extends
>>>> externalizable" and write the serialization yourself.
>>>>
>>>> - Patrick
>>>>
>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <[email protected]>
>>>> wrote:
>>>>>
>>>>> Hi All,
>>>>>
>>>>> My original RDD contains arrays of doubles. when appying a count()
>>>>> operator
>>>>> to the original RDD I get the result as expected.
>>>>> However when I run a map on the original RDD in order to generate a new
>>>>> RDD
>>>>> with only the first element of each array, and try to apply count() to
>>>>> the
>>>>> new generated RDD I get the following exception:
>>>>>
>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
>>>>> Failed
>>>>> to
>>>>> run count at AnalyticsEngine.java:133
>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>> org.apache.spark.SparkException: Job failed:
>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>>       at
>>>>>
>>>>>
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>>       at
>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>
>>>>>
>>>>> If a run a take() operation on the new RDD I receive the results as
>>>>> expected. here is my code:
>>>>>
>>>>>
>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>>>>> BSONObject>, Double>() {
>>>>>           @Override
>>>>>           public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>>>>>             BSONObject doc = e._2();
>>>>>             List<List<Double>> vals =
>>>>> (List<List<Double>>)doc.get("data");
>>>>>             List<Double> results = new ArrayList<Double>();
>>>>>             for (int i=0; i< vals.size();i++ )
>>>>>                 results.add((Double)vals.get(i).get(0));
>>>>>             return results;
>>>>>
>>>>>           }
>>>>>           });
>>>>>
>>>>>           logger.info("Take: {}", rdd2.take(100));
>>>>>           logger.info("Count: {}", rdd2.count());
>>>>>
>>>>>
>>>>> Any ideas on what I am doing wrong ?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Yadid
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Yadid Ayzenberg
>>>>> Graduate Student and Research Assistant
>>>>> Affective Computing
>>>>> Phone: 617-866-7226
>>>>> Room: E14-274G
>>>>> MIT Media Lab
>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Yadid Ayzenberg
>>> Graduate Student and Research Assistant
>>> Affective Computing
>>> Phone: 617-866-7226
>>> Room: E14-274G
>>> MIT Media Lab
>>> 75 Amherst st, Cambridge, MA, 02139
>>>
>>>
>>>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Reply via email to