Is it possible to port WrappedArraySerializer.scala to your app ?

Pardon me for not knowing how to integrate Chill with Spark.

Cheers

On Mon, Feb 16, 2015 at 12:31 AM, Tao Xiao <xiaotao.cs....@gmail.com> wrote:

> Thanks Ted
>
> After searching for a whole day, I still don't know how to let spark use
> twitter chill serialization - there are very few documents about how to
> integrate twitter chill into Spark for serialization. I tried the
> following, but an exception of "java.lang.ClassCastException:
> com.twitter.chill.WrappedArraySerializer cannot be cast to
> org.apache.spark.serializer.Serializer" was thrown:
>
>         val conf = new SparkConf()
>                    .setAppName("Test Serialization")
>                    .set("spark.serializer",
> "com.twitter.chill.WrappedArraySerializer")
>
>
> Well, what is the correct way of configuring Spark to use the twitter
> chill serialization framework ?
>
>
>
>
>
>
>
> 2015-02-15 22:23 GMT+08:00 Ted Yu <yuzhih...@gmail.com>:
>
>> I was looking at https://github.com/twitter/chill
>>
>> It seems this would achieve what you want:
>> chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala
>>
>> Cheers
>>
>> On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao <xiaotao.cs....@gmail.com>
>> wrote:
>>
>>> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
>>> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
>>> serialized even when I registered both of them in Kryo.
>>>
>>> The code is as follows:
>>>
>>>        val conf = new SparkConf()
>>>                 .setAppName("Hello Spark")
>>>                 .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>                 .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>>>
>>>         val sc = new SparkContext(conf)
>>>
>>>         val rdd = sc.parallelize(List(
>>>                     (new ImmutableBytesWritable(Bytes.toBytes("AAA")),
>>> new KeyValue()),
>>>                     (new ImmutableBytesWritable(Bytes.toBytes("BBB")),
>>> new KeyValue()),
>>>                     (new ImmutableBytesWritable(Bytes.toBytes("CCC")),
>>> new KeyValue()),
>>>                     (new ImmutableBytesWritable(Bytes.toBytes("DDD")),
>>> new KeyValue())), 4)
>>>
>>>         // snippet 1:  a single object of *ImmutableBytesWritable* can
>>> be serialized in broadcast
>>>         val partitioner = new SingleElementPartitioner(sc.broadcast(new
>>> ImmutableBytesWritable(Bytes.toBytes(3))))
>>>         val ret = rdd.aggregateByKey(List[KeyValue](),
>>> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
>>>         println("\n\n\ret.count = " + ret.count + ",  partition size = "
>>> + ret.partitions.size)
>>>
>>>         // snippet 2: an array of *ImmutableBytesWritable* can not be
>>> serialized in broadcast
>>>         val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)),
>>> new ImmutableBytesWritable(Bytes.toBytes(2)), new
>>> ImmutableBytesWritable(Bytes.toBytes(3)))
>>>         val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
>>>         val ret1 = rdd.aggregateByKey(List[KeyValue](),
>>> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
>>>         println("\n\n\nrdd2.count = " + ret1.count)
>>>
>>>         sc.stop
>>>
>>>
>>>       // the following are kryo registrator and partitioners
>>>        class MyKryoRegistrator extends KryoRegistrator {
>>>             override def registerClasses(kryo: Kryo): Unit = {
>>>                  kryo.register(classOf[ImmutableBytesWritable])   //
>>> register ImmutableBytesWritable
>>>                  kryo.register(classOf[Array[ImmutableBytesWritable]])
>>>  // register Array[ImmutableBytesWritable]
>>>             }
>>>        }
>>>
>>>        class SingleElementPartitioner(bc:
>>> Broadcast[ImmutableBytesWritable]) extends Partitioner {
>>>             override def numPartitions: Int = 5
>>>             def v = Bytes.toInt(bc.value.get)
>>>             override def getPartition(key: Any): Int =  v - 1
>>>        }
>>>
>>>
>>>         class ArrayPartitioner(bc:
>>> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
>>>             val arr = bc.value
>>>             override def numPartitions: Int = arr.length
>>>             override def getPartition(key: Any): Int =
>>> Bytes.toInt(arr(0).get)
>>>         }
>>>
>>>
>>>
>>> In the code above, snippet 1 can work as expected. But snippet 2 throws
>>> "Task not serializable: java.io.NotSerializableException:
>>> org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .
>>>
>>>
>>> So do I have to implement a Kryo serializer for Array[T] if it is used
>>> in broadcast ?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to