Thanks Ted
After searching for a whole day, I still don't know how to let spark use
twitter chill serialization - there are very few documents about how to
integrate twitter chill into Spark for serialization. I tried the
following, but an exception of "java.lang.ClassCastException:
com.twitter.chill.WrappedArraySerializer cannot be cast to
org.apache.spark.serializer.Serializer" was thrown:
val conf = new SparkConf()
.setAppName("Test Serialization")
.set("spark.serializer",
"com.twitter.chill.WrappedArraySerializer")
Well, what is the correct way of configuring Spark to use the twitter chill
serialization framework ?
2015-02-15 22:23 GMT+08:00 Ted Yu <[email protected]>:
> I was looking at https://github.com/twitter/chill
>
> It seems this would achieve what you want:
> chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala
>
> Cheers
>
> On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao <[email protected]>
> wrote:
>
>> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
>> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
>> serialized even when I registered both of them in Kryo.
>>
>> The code is as follows:
>>
>> val conf = new SparkConf()
>> .setAppName("Hello Spark")
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>>
>> val sc = new SparkContext(conf)
>>
>> val rdd = sc.parallelize(List(
>> (new ImmutableBytesWritable(Bytes.toBytes("AAA")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("BBB")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("CCC")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("DDD")),
>> new KeyValue())), 4)
>>
>> // snippet 1: a single object of *ImmutableBytesWritable* can
>> be serialized in broadcast
>> val partitioner = new SingleElementPartitioner(sc.broadcast(new
>> ImmutableBytesWritable(Bytes.toBytes(3))))
>> val ret = rdd.aggregateByKey(List[KeyValue](),
>> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>> (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
>> println("\n\n\ret.count = " + ret.count + ", partition size = "
>> + ret.partitions.size)
>>
>> // snippet 2: an array of *ImmutableBytesWritable* can not be
>> serialized in broadcast
>> val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
>> ImmutableBytesWritable(Bytes.toBytes(2)), new
>> ImmutableBytesWritable(Bytes.toBytes(3)))
>> val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
>> val ret1 = rdd.aggregateByKey(List[KeyValue](),
>> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>> (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
>> println("\n\n\nrdd2.count = " + ret1.count)
>>
>> sc.stop
>>
>>
>> // the following are kryo registrator and partitioners
>> class MyKryoRegistrator extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo): Unit = {
>> kryo.register(classOf[ImmutableBytesWritable]) //
>> register ImmutableBytesWritable
>> kryo.register(classOf[Array[ImmutableBytesWritable]])
>> // register Array[ImmutableBytesWritable]
>> }
>> }
>>
>> class SingleElementPartitioner(bc:
>> Broadcast[ImmutableBytesWritable]) extends Partitioner {
>> override def numPartitions: Int = 5
>> def v = Bytes.toInt(bc.value.get)
>> override def getPartition(key: Any): Int = v - 1
>> }
>>
>>
>> class ArrayPartitioner(bc:
>> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
>> val arr = bc.value
>> override def numPartitions: Int = arr.length
>> override def getPartition(key: Any): Int =
>> Bytes.toInt(arr(0).get)
>> }
>>
>>
>>
>> In the code above, snippet 1 can work as expected. But snippet 2 throws
>> "Task not serializable: java.io.NotSerializableException:
>> org.apache.hadoop.hbase.io.ImmutableBytesWritable" .
>>
>>
>> So do I have to implement a Kryo serializer for Array[T] if it is used in
>> broadcast ?
>>
>> Thanks
>>
>>
>>
>>
>>
>