Hi,

I apologize, I spoke too soon.
Those transient member variables may not be the issue.

To clarify my test case I am creating a LinkedHashMap with two elements in
a map expression on an RDD.
Note that the LinkedHashMaps are being created on the worker JVMs (not the
driver JVM) and THEN collected to the driver JVM.
I am NOT creating LinkedHashMaps on the driver and then parallelizing them
(sending them to worker JVMs).

As Renato said spark requires us to register classes that aren't yet in
Chill.
As far as I know there are three ways to register and it's through api
calls on sparkConf.

1. sparkConf().registerKryoClasses(Array(classOf[...], clasOf[...]))
* This is the method of registering classes as described in the Tuning page:
http://spark.apache.org/docs/latest/tuning.html#data-serialization

2. sparkConf().set("spark.kryo.classesToRegister", "cName1, cName2")

3. sparkConf().set("spark.kryo.registrator", "registrator1, registrator2")

In the first two methods, which set the classes to register in Kryo,
what I get are empty mutable.LinkedHashMaps after calling collect on the
RDD.
To my best understanding this should not happen (none of the other
collection classes I have used have this problem).

For the third method I created a registrator for mutable.LinkedHashMap
which can be found here :
https://gist.github.com/rahulpalamuttam/9f3bfa39a160efa80844d3a7a7bd87cd

I set the registrator like so :
sparkConf().set("spark.kryo.registrator",
"org.dia.core.MutableLinkedHashMapRegistrator").
Now, when I do the same test, I get an Array of LinkedHashMaps.
Each LinkedHashMap contains the entries I populated it with in the map task.

Why do the first two methods result in improper serialization of
mutable.LinkedHashMap?
Should I file a JIRA for it?

Much credit should be given to Martin Grotzke from EsotericSoftware/kryo
who helped me tremendously.

Best,

Rahul Palamuttam




On Fri, Aug 26, 2016 at 10:16 AM, Rahul Palamuttam <rahulpala...@gmail.com>
wrote:

> Thanks Renato.
>
> I forgot to reply all last time. I apologize for the rather confusing
> example.
> All that the snipet code did was
> 1. Make an RDD of LinkedHashMaps with size 2
> 2. On the worker side get the sizes of the HashMaps (via a map(hash =>
> hash.size))
> 3. On the driver call collect on the RDD[Ints] which is the RDD of hashmap
> sizes giving you an Array[Ints]
> 4. On the driver call collect on the RDD[LinkedHashMap] giving you an
> Array[LinkedHashMap]
> 5. Check the size of a hashmap in Array[LinkedHashMap] with any size value
> in Array[Ints] (they're all going to be the same size).
> 6. The sizes differ because the elements of the LinkedHashMap were never
> copied over
>
> Anyway I think I've tracked down the issue and it doesn't seem to be a
> spark or kryo issue.
>
> For those it concerns LinkedHashMap has this serialization issue because
> it has transient members for firstEntry and lastEntry.
> Take a look here : https://github.com/scala/scala/blob/v2.11.8/src/
> library/scala/collection/mutable/LinkedHashMap.scala#L62
>
> Those attributes are not going to be serialized.
> Furthermore, the iterator on LinkedHashMap depends on the firstEntry
> variable
> Since that member is not serialized it is null.
> The iterator requires the firstEntry variable to walk the LinkedHashMap
> https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/
> mutable/LinkedHashMap.scala#L94-L100
>
> I wonder why these two variables were made transient.
>
> Best,
> Rahul Palamuttam
>
>
> On Thu, Aug 25, 2016 at 11:13 PM, Renato Marroquín Mogrovejo <
> renatoj.marroq...@gmail.com> wrote:
>
>> Hi Rahul,
>>
>> You have probably already figured this one out, but anyway...
>> You need to register the classes that you'll be using with Kryo because
>> it does not support all Serializable types and requires you to register the
>> classes you’ll use in the program in advance. So when you don't register
>> the class, Kryo doesn't know how to serialize/deserialize it.
>>
>>
>> Best,
>>
>> Renato M.
>>
>> 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam <rahulpala...@gmail.com>:
>>
>>> Hi,
>>>
>>> Just sending this again to see if others have had this issue.
>>>
>>> I recently switched to using kryo serialization and I've been running
>>> into errors
>>> with the mutable.LinkedHashMap class.
>>>
>>> If I don't register the mutable.LinkedHashMap class then I get an
>>> ArrayStoreException seen below.
>>> If I do register the class, then when the LinkedHashMap is collected on
>>> the driver, it does not contain any elements.
>>>
>>> Here is the snippet of code I used :
>>>
>>> val sc = new SparkContext(new SparkConf()
>>>   .setMaster("local[*]")
>>>   .setAppName("Sample")
>>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>   .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, 
>>> String]])))
>>>
>>> val collect = sc.parallelize(0 to 10)
>>>   .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
>>> "bonjour"), ("good", "bueno")))
>>>
>>> val mapSideSizes = collect.map(p => p.size).collect()(0)
>>> val driverSideSizes = collect.collect()(0).size
>>>
>>> println("The sizes before collect : " + mapSideSizes)
>>> println("The sizes after collect : " + driverSideSizes)
>>>
>>>
>>> ** The following only occurs if I did not register the
>>> mutable.LinkedHashMap class **
>>> 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task
>>> result
>>> java.lang.ArrayStoreException: scala.collection.mutable.HashMap
>>> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer
>>> s$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer
>>> s$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>> at org.apache.spark.serializer.KryoSerializerInstance.deseriali
>>> ze(KryoSerializer.scala:311)
>>> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult
>>> .scala:97)
>>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>>> $run$1.apply$mcV$sp(TaskResultGetter.scala:60)
>>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>>> $run$1.apply(TaskResultGetter.scala:51)
>>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>>> $run$1.apply(TaskResultGetter.scala:51)
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
>>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(Task
>>> ResultGetter.scala:50)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I hope this is a known issue and/or I'm missing something important in
>>> my setup.
>>> Appreciate any help or advice!
>>>
>>> Best,
>>>
>>> Rahul Palamuttam
>>>
>>
>>
>

Reply via email to