Hi,

            If your message is string you will have to Change Encoder and 
Decoder to StringEncoder , StringDecoder.

            If your message Is byte[] you can use DefaultEncoder & Decoder.



Also Don’t forget to add import statements depending on ur encoder and decoder.

import kafka.serializer.StringEncoder;

import kafka.serializer. StringDecoder;


Regards
Jishnu Prathap

-----Original Message-----
From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Friday, February 06, 2015 6:41 AM
To: Eduardo Costa Alfaia; Sean Owen
Cc: user@spark.apache.org
Subject: RE: Error KafkaStream



Hi,



I think you should change the `DefaultDecoder` of your type parameter into 
`StringDecoder`, seems you want to decode the message into String. 
`DefaultDecoder` is to return Array[Byte], not String, so here class casting 
will meet error.



Thanks

Jerry



-----Original Message-----

From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it]

Sent: Friday, February 6, 2015 12:04 AM

To: Sean Owen

Cc: user@spark.apache.org<mailto:user@spark.apache.org>

Subject: Re: Error KafkaStream



I don’t think so Sean.



> On Feb 5, 2015, at 16:57, Sean Owen 
> <so...@cloudera.com<mailto:so...@cloudera.com>> wrote:

>

> Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same 
> issue?

>

> On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia

> <e.costaalf...@unibs.it<mailto:e.costaalf...@unibs.it>> wrote:

>> Hi Guys,

>> I’m getting this error in KafkaWordCount;

>>

>> TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234):

>> java.lang.ClassCastException: [B cannot be cast to java.lang.String

>>        at

>> org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu

>> n$apply$1.apply(KafkaWordCount.scala:7

>>

>>

>> Some idea that could be?

>>

>>

>> Bellow the piece of code

>>

>>

>>

>> val kafkaStream = {

>>        val kafkaParams = Map[String, String](

>>                "zookeeper.connect" -> "achab3:2181",

>>                "group.id" -> "mygroup",

>>                "zookeeper.connect.timeout.ms" -> "10000",

>>                "kafka.fetch.message.max.bytes" -> "4000000",

>>                "auto.offset.reset" -> "largest")

>>

>>        val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap

>>  //    val lines = KafkaUtils.createStream[String, String, DefaultDecoder,

>> DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel =

>> StorageLevel.MEMORY_ONLY_SER).map(_._2)

>>        val KafkaDStreams = (1 to numStreams).map {_ =>

>>        KafkaUtils.createStream[String, String, DefaultDecoder,

>> DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel =

>> StorageLevel.MEMORY_ONLY_SER).map(_._2)

>>        }

>>    val unifiedStream = ssc.union(KafkaDStreams)

>>    unifiedStream.repartition(sparkProcessingParallelism)

>> }

>>

>> Thanks Guys

>>

>> Informativa sulla Privacy: http://www.unibs.it/node/8155





--

Informativa sulla Privacy: http://www.unibs.it/node/8155



---------------------------------------------------------------------

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For 
additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>


Reply via email to