Hi, If your message is string you will have to Change Encoder and Decoder to StringEncoder , StringDecoder.
If your message Is byte[] you can use DefaultEncoder & Decoder. Also Don’t forget to add import statements depending on ur encoder and decoder. import kafka.serializer.StringEncoder; import kafka.serializer. StringDecoder; Regards Jishnu Prathap -----Original Message----- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Friday, February 06, 2015 6:41 AM To: Eduardo Costa Alfaia; Sean Owen Cc: user@spark.apache.org Subject: RE: Error KafkaStream Hi, I think you should change the `DefaultDecoder` of your type parameter into `StringDecoder`, seems you want to decode the message into String. `DefaultDecoder` is to return Array[Byte], not String, so here class casting will meet error. Thanks Jerry -----Original Message----- From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 12:04 AM To: Sean Owen Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Error KafkaStream I don’t think so Sean. > On Feb 5, 2015, at 16:57, Sean Owen > <so...@cloudera.com<mailto:so...@cloudera.com>> wrote: > > Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same > issue? > > On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia > <e.costaalf...@unibs.it<mailto:e.costaalf...@unibs.it>> wrote: >> Hi Guys, >> I’m getting this error in KafkaWordCount; >> >> TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): >> java.lang.ClassCastException: [B cannot be cast to java.lang.String >> at >> org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu >> n$apply$1.apply(KafkaWordCount.scala:7 >> >> >> Some idea that could be? >> >> >> Bellow the piece of code >> >> >> >> val kafkaStream = { >> val kafkaParams = Map[String, String]( >> "zookeeper.connect" -> "achab3:2181", >> "group.id" -> "mygroup", >> "zookeeper.connect.timeout.ms" -> "10000", >> "kafka.fetch.message.max.bytes" -> "4000000", >> "auto.offset.reset" -> "largest") >> >> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap >> // val lines = KafkaUtils.createStream[String, String, DefaultDecoder, >> DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = >> StorageLevel.MEMORY_ONLY_SER).map(_._2) >> val KafkaDStreams = (1 to numStreams).map {_ => >> KafkaUtils.createStream[String, String, DefaultDecoder, >> DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = >> StorageLevel.MEMORY_ONLY_SER).map(_._2) >> } >> val unifiedStream = ssc.union(KafkaDStreams) >> unifiedStream.repartition(sparkProcessingParallelism) >> } >> >> Thanks Guys >> >> Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>