Is it possible to port WrappedArraySerializer.scala to your app ? Pardon me for not knowing how to integrate Chill with Spark.
Cheers On Mon, Feb 16, 2015 at 12:31 AM, Tao Xiao <xiaotao.cs....@gmail.com> wrote: > Thanks Ted > > After searching for a whole day, I still don't know how to let spark use > twitter chill serialization - there are very few documents about how to > integrate twitter chill into Spark for serialization. I tried the > following, but an exception of "java.lang.ClassCastException: > com.twitter.chill.WrappedArraySerializer cannot be cast to > org.apache.spark.serializer.Serializer" was thrown: > > val conf = new SparkConf() > .setAppName("Test Serialization") > .set("spark.serializer", > "com.twitter.chill.WrappedArraySerializer") > > > Well, what is the correct way of configuring Spark to use the twitter > chill serialization framework ? > > > > > > > > 2015-02-15 22:23 GMT+08:00 Ted Yu <yuzhih...@gmail.com>: > >> I was looking at https://github.com/twitter/chill >> >> It seems this would achieve what you want: >> chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala >> >> Cheers >> >> On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao <xiaotao.cs....@gmail.com> >> wrote: >> >>> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be >>> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be >>> serialized even when I registered both of them in Kryo. >>> >>> The code is as follows: >>> >>> val conf = new SparkConf() >>> .setAppName("Hello Spark") >>> .set("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") >>> .set("spark.kryo.registrator", "xt.MyKryoRegistrator") >>> >>> val sc = new SparkContext(conf) >>> >>> val rdd = sc.parallelize(List( >>> (new ImmutableBytesWritable(Bytes.toBytes("AAA")), >>> new KeyValue()), >>> (new ImmutableBytesWritable(Bytes.toBytes("BBB")), >>> new KeyValue()), >>> (new ImmutableBytesWritable(Bytes.toBytes("CCC")), >>> new KeyValue()), >>> (new ImmutableBytesWritable(Bytes.toBytes("DDD")), >>> new KeyValue())), 4) >>> >>> // snippet 1: a single object of *ImmutableBytesWritable* can >>> be serialized in broadcast >>> val partitioner = new SingleElementPartitioner(sc.broadcast(new >>> ImmutableBytesWritable(Bytes.toBytes(3)))) >>> val ret = rdd.aggregateByKey(List[KeyValue](), >>> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs, >>> (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist() >>> println("\n\n\ret.count = " + ret.count + ", partition size = " >>> + ret.partitions.size) >>> >>> // snippet 2: an array of *ImmutableBytesWritable* can not be >>> serialized in broadcast >>> val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), >>> new ImmutableBytesWritable(Bytes.toBytes(2)), new >>> ImmutableBytesWritable(Bytes.toBytes(3))) >>> val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) >>> val ret1 = rdd.aggregateByKey(List[KeyValue](), >>> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs, >>> (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ) >>> println("\n\n\nrdd2.count = " + ret1.count) >>> >>> sc.stop >>> >>> >>> // the following are kryo registrator and partitioners >>> class MyKryoRegistrator extends KryoRegistrator { >>> override def registerClasses(kryo: Kryo): Unit = { >>> kryo.register(classOf[ImmutableBytesWritable]) // >>> register ImmutableBytesWritable >>> kryo.register(classOf[Array[ImmutableBytesWritable]]) >>> // register Array[ImmutableBytesWritable] >>> } >>> } >>> >>> class SingleElementPartitioner(bc: >>> Broadcast[ImmutableBytesWritable]) extends Partitioner { >>> override def numPartitions: Int = 5 >>> def v = Bytes.toInt(bc.value.get) >>> override def getPartition(key: Any): Int = v - 1 >>> } >>> >>> >>> class ArrayPartitioner(bc: >>> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { >>> val arr = bc.value >>> override def numPartitions: Int = arr.length >>> override def getPartition(key: Any): Int = >>> Bytes.toInt(arr(0).get) >>> } >>> >>> >>> >>> In the code above, snippet 1 can work as expected. But snippet 2 throws >>> "Task not serializable: java.io.NotSerializableException: >>> org.apache.hadoop.hbase.io.ImmutableBytesWritable" . >>> >>> >>> So do I have to implement a Kryo serializer for Array[T] if it is used >>> in broadcast ? >>> >>> Thanks >>> >>> >>> >>> >>> >> >