@Saisai Shao, Thanks for the pointer. It turned out to be the serialization issue. I was using scalabuff to generate my "KafkaGenericEvent" class. But when i went through the generated class code, i figured out that it is not serializable. Now i am generating my classes using scalapb ( https://github.com/trueaccord/ScalaPB) and my problem is solved.
Thanks On Thu, Sep 17, 2015 at 10:43 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote: > Is your "KafkaGenericEvent" serializable? Since you call rdd.collect() to > fetch the data to local driver, so this KafkaGenericEvent need to be > serialized and deserialized through Java or Kryo (depends on your > configuration) serializer, not sure if it is your problem to always get a > default object. > > Also would you provide the implementation of `parseFrom`, so we could > better understand the details of how you do deserialization. > > Thanks > Saisai > > On Thu, Sep 17, 2015 at 9:49 AM, srungarapu vamsi < > srungarapu1...@gmail.com> wrote: > >> If i understand correctly, i guess you are suggesting me to do this : >> >> val kafkaDStream = >> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, >> kafkaConf, Set(topics)) >> >> kafkaDStream.map{ >> case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) >> } foreachRDD(rdd=>rdd.collect().map{ >> case(devId,genericEvent)=>{ >> println(genericEvent) >> } >> }) >> >> I read from Kafka as a Byte Array => applied a transformation on the >> byteArray to Custom Class => Printed the custom class for debugging purpose. >> >> But this is not helping me. i.e i am getting an empty object with default >> values when i printed "genericEvent" >> >> Please correct me if i did not get what you are suggesting me to try. >> >> >> On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase <atan...@adobe.com> wrote: >> >>> I guess what I'm asking is why not start with a Byte array like in the >>> example that works (using the DefaultDecoder) then map over it and do the >>> decoding manually like I'm suggesting below. >>> >>> Have you tried this approach? We have the same workflow (kafka => >>> protobuf => custom class) and it works. >>> If you expect invalid messages, you can use flatMap instead and wrap >>> .parseFrom in a Try {....} .toOption. >>> >>> Sent from my iPhone >>> >>> On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com> >>> wrote: >>> >>> @Adrian, >>> I am doing collect for debugging purpose. But i have to use foreachRDD >>> so that i can operate on top of this rdd and eventually save to DB. >>> >>> But my actual problem here is to properly convert Array[Byte] to my >>> custom object. >>> >>> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> >>> wrote: >>> >>>> Why are you calling foreachRdd / collect in the first place? >>>> >>>> Instead of using a custom decoder, you should simply do – this is code >>>> executed on the workers and allows the computation to continue. ForeachRdd >>>> and collect are output operations and force the data to be collected on the >>>> driver (assuming you don’t want that…) >>>> >>>> val events = kafkaDStream.map { case(devId,byteArray)=> >>>> KafkaGenericEvent.parseFrom(byteArray) } >>>> >>>> From: srungarapu vamsi >>>> Date: Thursday, September 17, 2015 at 4:03 PM >>>> To: user >>>> Subject: Spark Streaming kafka directStream value decoder issue >>>> >>>> I am using KafkaUtils.createDirectStream to read the data from kafka >>>> bus. >>>> >>>> On the producer end, i am generating in the following way: >>>> >>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) >>>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >>>> "org.apache.kafka.common.serialization.StringSerializer") >>>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >>>> "org.apache.kafka.common.serialization.StringSerializer") >>>> val producer = new KafkaProducer[String, KafkaGenericEvent](props) >>>> >>>> // Send some messages >>>> println("Sending message") >>>> val kafkaGenericEvent = new >>>> KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L) >>>> val message = new ProducerRecord[String, >>>> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent) >>>> producer.send(message) >>>> } >>>> >>>> I am connecting to kafka using the console consumer script and am able >>>> to see proper data. The KafkaGenericEvent used in the above code is the >>>> class generated using ScalaBuff from a protobuff file. >>>> >>>> On the consumer end, >>>> If i read the value as a normal byte array and the convert it into >>>> KafkaGenericEvent in the following way, i get proper data: >>>> >>>> val kafkaDStream = >>>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, >>>> kafkaConf, Set(topics)) >>>> >>>> kafkaDStream.foreachRDD(rdd=>rdd.collect().map{ >>>> case(devId,byteArray)=>{ >>>> println(KafkaGenericEvent.parseFrom(byteArray)) >>>> } >>>> }) >>>> >>>> But if change the value to KafkaGenericEvent and use a custom decoder >>>> like this: >>>> >>>> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) >>>> extends Decoder[KafkaGenericEvent]{ >>>> override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = { >>>> KafkaGenericEvent.parseFrom(bytes) >>>> } >>>> } >>>> >>>> and in consumer: >>>> >>>> val kafkaDStream = >>>> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, >>>> kafkaConf, Set(topics)) >>>> kafkaDStream foreachRDD(rdd=>rdd.collect().map{ >>>> case(devId,genericEvent)=>{ >>>> println(genericEvent) >>>> } >>>> }) >>>> >>>> Now, i my value object KafkaGenericEvent is not created based on the >>>> sent data instead it is creating an empty Object of KafkaGenericEvent with >>>> default values. >>>> >>>> Even if i read the value as array of bytes in the createDirectStream >>>> and than apply a transformation in the following way i am getting in >>>> correct values: >>>> >>>> val kafkaDStream = >>>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, >>>> kafkaConf, Set(topics)) >>>> >>>> kafkaDStream.map{ >>>> case(devId,byteArray) >>>> =>(devId,KafkaGenericEvent.parseFrom(byteArray)) >>>> } foreachRDD(rdd=>rdd.collect().map{ >>>> case(devId,genericEvent)=>{ >>>> println(genericEvent) >>>> } >>>> }) >>>> >>>> I get the default KafkaGenericEvent Object in the line println >>>> (genericEvent) >>>> Does this mean that I can transform the values only on the driver and >>>> not on the executors? >>>> >>>> I am completely confused here! >>>> I am using : >>>> scala-2.10.4 >>>> spark-1.3.1 >>>> kafka_2.10-0.8.2.1 >>>> >>>> - >>>> /Vamsi >>>> >>> >>> >>> >>> -- >>> /Vamsi >>> >>> >> >> >> -- >> /Vamsi >> > > -- /Vamsi