Hi, I want to include if possible Kryo serialization in a project and first I'm trying to run FlumeEventCount with Kryo. If I comment setAll method, runs correctly, but if I use Kryo params it returns several errors.
15/02/11 11:42:16 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 1 15/02/11 11:42:16 ERROR JobScheduler: Error running job streaming job 1423651330000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 8, localhost): ExecutorLostFailure (executor 1 lost) This is my code. object flumeKryo { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf(true) .setMaster("spark://localhost:7077") .setAppName("TestKryo") *.setAll( Map( "spark.serializer" -> "com.gmunoz.flumekryo.WrapperSerializer", "spark.kryo.registrator" -> "com.gmunoz.flumekryo.MyRegistrator", "spark.task.maxFailures" -> "1", "spark.rdd.compress" -> "true", "spark.storage.memoryFraction" -> "1", "spark.core.connection.ack.wait.timeout" -> "600", "spark.akka.frameSize" -> "50" ) )* val sc = new SparkContext(sparkConf) sc.addJar("/home/gmunoz/workspace/flumekryo/target/flumekryo-1.0-SNAPSHOT.jar") val ssc = new StreamingContext(sc, Seconds(2)) // Create a flume stream val stream = FlumeUtils.createPollingStream(ssc, "localhost", 11000, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events.").print() ssc.start() ssc.awaitTermination() } } class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { Console.err.println("################# MyRegistrator called") kryo.register(classOf[SparkFlumeEvent]) } } class WrapperSerializer(conf: SparkConf) extends KryoSerializer(conf) { override def newKryo(): Kryo = { println("## Called newKryo!") super.newKryo() } What am I doing wrong? Thanks!