It's really a question of whether you need access to the
MessageAndMetadata, or just the key / value from the message.

If you just need the key/value, dstream map is fine.

In your case, since you need to be able to control a possible failure when
deserializing the message from the MessageAndMetadata, I'd just go ahead
and do the work in the messageHandler.

On Fri, Jul 24, 2015 at 2:46 AM, Nicolas Phung <nicolas.ph...@gmail.com>
wrote:

> Hello,
>
> I manage to read all my data back with skipping offset that contains a
> corrupt message. I have one more question regarding messageHandler method
> vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm
> using a function to read the serialized message from kafka and convert it
> into my appropriate object with some enrichments and sometimes add filter
> after that. Where's the best spot to put this logic inside messageHandler
> method (convert each message within this handler) or dstream.foreachRDD.map
> (map rdd) or dstream.map.foreachRDD (map dstream) ?
>
> Thank you for your help Cody.
> Regards,
> Nicolas PHUNG
>
> On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Yeah, I'm referring to that api.
>>
>> If you want to filter messages in addition to catching that exception,
>> have your mesageHandler return an option, so the type R would end up being
>> Option[WhateverYourClassIs], then filter out None before doing the rest of
>> your processing.
>>
>> If you aren't already recording offsets somewhere, and need to find the
>> offsets at the beginning of the topic, you can take a look at this
>>
>>
>> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143
>>
>> as an example of querying offsets from Kafka.
>>
>> That code is private, but you can either use it as an example, or remove
>> the private[spark] and recompile just the spark-streaming-kafka package.
>> That artifact is included in your job assembly, so you won't have to
>> redeploy spark if you go that route.
>>
>>
>> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <nicolas.ph...@gmail.com>
>> wrote:
>>
>>> Hi Cody,
>>>
>>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand
>>> how to use the messageHandler parameter/function in the createDirectStream
>>> method. You are referring to this, aren't you ?
>>>
>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc:
>>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[
>>> TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R
>>> ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R](
>>> ssc, kafkaParams, fromOffsets, messageHandler) }
>>>
>>> So, I must supply the fromOffsets parameter too, but how do I tell this
>>> method to read from the beginning of my topic ?
>>>
>>> If I have a filter (e.g. a R.date field) on my R class, I can put a
>>> filter in the messageHandler function too ?
>>>
>>> Regards,
>>> Nicolas P.
>>>
>>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Yeah, in the function you supply for the messageHandler parameter to
>>>> createDirectStream, catch the exception and do whatever makes sense for
>>>> your application.
>>>>
>>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <
>>>> nicolas.ph...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Using the old Spark Streaming Kafka API, I got the following around
>>>>> the same offset:
>>>>>
>>>>> kafka.message.InvalidMessageException: Message is corrupt (stored crc
>>>>> = 3561357254, computed crc = 171652633)
>>>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>>>         at
>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>>>         at
>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>>>         at
>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>>>         at
>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
>>>>> java.lang.IllegalStateException: Iterator is in failed state
>>>>>         at
>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> I found some old topic about some possible corrupt Kafka message
>>>>> produced by the new producer API with Snappy compression on. My question
>>>>> is, is it possible to skip/ignore those offsets when full processing with
>>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>>>
>>>>> Regards,
>>>>> Nicolas PHUNG
>>>>>
>>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> I'd try logging the offsets for each message, see where problems
>>>>>> start, then try using the console consumer starting at those offsets and
>>>>>> see if you can reproduce the problem.
>>>>>>
>>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <
>>>>>> nicolas.ph...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody,
>>>>>>>
>>>>>>> Thanks for you help. It seems there's something wrong with some
>>>>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>>>>> bigger or incomplete message since I use default configuration to accept
>>>>>>> only 1Mb message in my Kafka topic. If you have any others informations 
>>>>>>> or
>>>>>>> suggestions, please tell me.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Nicolas PHUNG
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Not exactly the same issue, but possibly related:
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <
>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> Well, working backwards down the stack trace...
>>>>>>>>>
>>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>
>>>>>>>>> That exception gets thrown if the limit is negative or greater than 
>>>>>>>>> the buffer's capacity
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>
>>>>>>>>> If size had been negative, it would have just returned null, so we
>>>>>>>>> know the exception got thrown because the size was greater than the
>>>>>>>>> buffer's capacity
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I haven't seen that before... maybe a corrupted message of some
>>>>>>>>> kind?
>>>>>>>>>
>>>>>>>>> If that problem is reproducible, try providing an explicit
>>>>>>>>> argument for messageHandler, with a function that logs the message 
>>>>>>>>> offset.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>>>> nicolas.ph...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new 
>>>>>>>>>> Spark Streaming Kafka method createDirectStream, everything is fine 
>>>>>>>>>> till a driver error happened (driver is killed, connection lost...). 
>>>>>>>>>> When the driver pops up again, it resumes the processing with the 
>>>>>>>>>> checkpoint in HDFS. Except, I got this:
>>>>>>>>>>
>>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 
>>>>>>>>>> times; aborting job
>>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming 
>>>>>>>>>> job 1437032118000 ms.0
>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: 
>>>>>>>>>> Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 
>>>>>>>>>> 4.3 in stage 4.0 (TID 16, slave05.local): 
>>>>>>>>>> java.lang.IllegalArgumentException
>>>>>>>>>>      at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>      at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>      at kafka.message.Message.payload(Message.scala:218)
>>>>>>>>>>      at 
>>>>>>>>>> kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>>      at 
>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>      at 
>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>      at 
>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>>>>      at 
>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>      at 
>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>      at 
>>>>>>>>>> org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>>>>      at 
>>>>>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>      at 
>>>>>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>>>>      at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>>>>      at 
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>>      at 
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>
>>>>>>>>>> This is happening only when I'm doing a full data processing from
>>>>>>>>>> Kafka. If there's no load, when you killed the driver and then 
>>>>>>>>>> restart, it
>>>>>>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>>>>>>> encounters something similar ? How did you solve this ?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Nicolas PHUNG
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to