on the driver
(assuming you don’t want that…)
val events = kafkaDStream.map { case(devId,byteArray)=>
KafkaGenericEvent.parseFrom(byteArray) }
From: srungarapu vamsi
Date: Thursday, September 17, 2015 at 4:03 PM
To: user
Subject: Spark Streaming kafka directStream value decoder issue
I am using
gt;> Instead of using a custom decoder, you should simply do – this is code
>>>> executed on the workers and allows the computation to continue. ForeachRdd
>>>> and collect are output operations and force the data to be collected on the
>>>> driver (assuming yo
tinue. ForeachRdd
>>> and collect are output operations and force the data to be collected on the
>>> driver (assuming you don’t want that…)
>>>
>>> val events = kafkaDStream.map { case(devId,byteArray)=>
>>> KafkaGenericEvent.parseFrom(b
…)
>>
>> val events = kafkaDStream.map { case(devId,byteArray)=>
>> KafkaGenericEvent.parseFrom(byteArray) }
>>
>> From: srungarapu vamsi
>> Date: Thursday, September 17, 2015 at 4:03 PM
>> To: user
>> Subject: Spark Streaming kafka
t...)
val events = kafkaDStream.map { case(devId,byteArray)=>
KafkaGenericEvent.parseFrom(byteArray) }
From: srungarapu vamsi
Date: Thursday, September 17, 2015 at 4:03 PM
To: user
Subject: Spark Streaming kafka directStream value decoder issue
I am using KafkaUtils.createDirectStream to read th
on the
> driver (assuming you don’t want that…)
>
> val events = kafkaDStream.map { case(devId,byteArray)=>
> KafkaGenericEvent.parseFrom(byteArray) }
>
> From: srungarapu vamsi
> Date: Thursday, September 17, 2015 at 4:03 PM
> To: user
> Subject: Spark Streaming kafka di
(assuming you don’t want that…)
val events = kafkaDStream.map { case(devId,byteArray)=>
KafkaGenericEvent.parseFrom(byteArray) }
From: srungarapu vamsi
Date: Thursday, September 17, 2015 at 4:03 PM
To: user
Subject: Spark Streaming kafka directStream value decoder issue
I am us
I am using KafkaUtils.createDirectStream to read the data from kafka bus.
On the producer end, i am generating in the following way:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serializa