I guess what I'm asking is why not start with a Byte array like in the example that works (using the DefaultDecoder) then map over it and do the decoding manually like I'm suggesting below.
Have you tried this approach? We have the same workflow (kafka => protobuf => custom class) and it works. If you expect invalid messages, you can use flatMap instead and wrap .parse>From in a Try {....} .toOption. Sent from my iPhone On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com<mailto:srungarapu1...@gmail.com>> wrote: @Adrian, I am doing collect for debugging purpose. But i have to use foreachRDD so that i can operate on top of this rdd and eventually save to DB. But my actual problem here is to properly convert Array[Byte] to my custom object. On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com<mailto:atan...@adobe.com>> wrote: Why are you calling foreachRdd / collect in the first place? Instead of using a custom decoder, you should simply do - this is code executed on the workers and allows the computation to continue. ForeachRdd and collect are output operations and force the data to be collected on the driver (assuming you don't want that...) val events = kafkaDStream.map { case(devId,byteArray)=> KafkaGenericEvent.parseFrom(byteArray) } From: srungarapu vamsi Date: Thursday, September 17, 2015 at 4:03 PM To: user Subject: Spark Streaming kafka directStream value decoder issue I am using KafkaUtils.createDirectStream to read the data from kafka bus. On the producer end, i am generating in the following way: props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, KafkaGenericEvent](props) // Send some messages println("Sending message") val kafkaGenericEvent = new KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L) val message = new ProducerRecord[String, KafkaGenericEvent](topic,"myKey", kafkaGenericEvent) producer.send(message) } I am connecting to kafka using the console consumer script and am able to see proper data. The KafkaGenericEvent used in the above code is the class generated using ScalaBuff from a protobuff file. On the consumer end, If i read the value as a normal byte array and the convert it into KafkaGenericEvent in the following way, i get proper data: val kafkaDStream = KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream.foreachRDD(rdd=>rdd.collect().map{ case(devId,byteArray)=>{ println(KafkaGenericEvent.parseFrom(byteArray)) } }) But if change the value to KafkaGenericEvent and use a custom decoder like this: class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends Decoder[KafkaGenericEvent]{ override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = { KafkaGenericEvent.parseFrom(bytes) } } and in consumer: val kafkaDStream = KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream foreachRDD(rdd=>rdd.collect().map{ case(devId,genericEvent)=>{ println(genericEvent) } }) Now, i my value object KafkaGenericEvent is not created based on the sent data instead it is creating an empty Object of KafkaGenericEvent with default values. Even if i read the value as array of bytes in the createDirectStream and than apply a transformation in the following way i am getting in correct values: val kafkaDStream = KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream.map{ case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) } foreachRDD(rdd=>rdd.collect().map{ case(devId,genericEvent)=>{ println(genericEvent) } }) I get the default KafkaGenericEvent Object in the line println(genericEvent) Does this mean that I can transform the values only on the driver and not on the executors? I am completely confused here! I am using : scala-2.10.4 spark-1.3.1 kafka_2.10-0.8.2.1 - /Vamsi -- /Vamsi