It's really a question of whether you need access to the MessageAndMetadata, or just the key / value from the message.
If you just need the key/value, dstream map is fine. In your case, since you need to be able to control a possible failure when deserializing the message from the MessageAndMetadata, I'd just go ahead and do the work in the messageHandler. On Fri, Jul 24, 2015 at 2:46 AM, Nicolas Phung <nicolas.ph...@gmail.com> wrote: > Hello, > > I manage to read all my data back with skipping offset that contains a > corrupt message. I have one more question regarding messageHandler method > vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm > using a function to read the serialized message from kafka and convert it > into my appropriate object with some enrichments and sometimes add filter > after that. Where's the best spot to put this logic inside messageHandler > method (convert each message within this handler) or dstream.foreachRDD.map > (map rdd) or dstream.map.foreachRDD (map dstream) ? > > Thank you for your help Cody. > Regards, > Nicolas PHUNG > > On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Yeah, I'm referring to that api. >> >> If you want to filter messages in addition to catching that exception, >> have your mesageHandler return an option, so the type R would end up being >> Option[WhateverYourClassIs], then filter out None before doing the rest of >> your processing. >> >> If you aren't already recording offsets somewhere, and need to find the >> offsets at the beginning of the topic, you can take a look at this >> >> >> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143 >> >> as an example of querying offsets from Kafka. >> >> That code is private, but you can either use it as an example, or remove >> the private[spark] and recompile just the spark-streaming-kafka package. >> That artifact is included in your job assembly, so you won't have to >> redeploy spark if you go that route. >> >> >> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <nicolas.ph...@gmail.com> >> wrote: >> >>> Hi Cody, >>> >>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand >>> how to use the messageHandler parameter/function in the createDirectStream >>> method. You are referring to this, aren't you ? >>> >>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: >>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: >>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[ >>> TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R >>> ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R]( >>> ssc, kafkaParams, fromOffsets, messageHandler) } >>> >>> So, I must supply the fromOffsets parameter too, but how do I tell this >>> method to read from the beginning of my topic ? >>> >>> If I have a filter (e.g. a R.date field) on my R class, I can put a >>> filter in the messageHandler function too ? >>> >>> Regards, >>> Nicolas P. >>> >>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Yeah, in the function you supply for the messageHandler parameter to >>>> createDirectStream, catch the exception and do whatever makes sense for >>>> your application. >>>> >>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung < >>>> nicolas.ph...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> Using the old Spark Streaming Kafka API, I got the following around >>>>> the same offset: >>>>> >>>>> kafka.message.InvalidMessageException: Message is corrupt (stored crc >>>>> = 3561357254, computed crc = 171652633) >>>>> at kafka.message.Message.ensureValid(Message.scala:166) >>>>> at >>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102) >>>>> at >>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) >>>>> at >>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) >>>>> at >>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) >>>>> at >>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 >>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message >>>>> java.lang.IllegalStateException: Iterator is in failed state >>>>> at >>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) >>>>> at >>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> I found some old topic about some possible corrupt Kafka message >>>>> produced by the new producer API with Snappy compression on. My question >>>>> is, is it possible to skip/ignore those offsets when full processing with >>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ? >>>>> >>>>> Regards, >>>>> Nicolas PHUNG >>>>> >>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> I'd try logging the offsets for each message, see where problems >>>>>> start, then try using the console consumer starting at those offsets and >>>>>> see if you can reproduce the problem. >>>>>> >>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung < >>>>>> nicolas.ph...@gmail.com> wrote: >>>>>> >>>>>>> Hi Cody, >>>>>>> >>>>>>> Thanks for you help. It seems there's something wrong with some >>>>>>> messages within my Kafka topics then. I don't understand how, I can get >>>>>>> bigger or incomplete message since I use default configuration to accept >>>>>>> only 1Mb message in my Kafka topic. If you have any others informations >>>>>>> or >>>>>>> suggestions, please tell me. >>>>>>> >>>>>>> Regards, >>>>>>> Nicolas PHUNG >>>>>>> >>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Not exactly the same issue, but possibly related: >>>>>>>> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196 >>>>>>>> >>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger < >>>>>>>> c...@koeninger.org> wrote: >>>>>>>> >>>>>>>>> Well, working backwards down the stack trace... >>>>>>>>> >>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275) >>>>>>>>> >>>>>>>>> That exception gets thrown if the limit is negative or greater than >>>>>>>>> the buffer's capacity >>>>>>>>> >>>>>>>>> >>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236) >>>>>>>>> >>>>>>>>> If size had been negative, it would have just returned null, so we >>>>>>>>> know the exception got thrown because the size was greater than the >>>>>>>>> buffer's capacity >>>>>>>>> >>>>>>>>> >>>>>>>>> I haven't seen that before... maybe a corrupted message of some >>>>>>>>> kind? >>>>>>>>> >>>>>>>>> If that problem is reproducible, try providing an explicit >>>>>>>>> argument for messageHandler, with a function that logs the message >>>>>>>>> offset. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung < >>>>>>>>> nicolas.ph...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new >>>>>>>>>> Spark Streaming Kafka method createDirectStream, everything is fine >>>>>>>>>> till a driver error happened (driver is killed, connection lost...). >>>>>>>>>> When the driver pops up again, it resumes the processing with the >>>>>>>>>> checkpoint in HDFS. Except, I got this: >>>>>>>>>> >>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 >>>>>>>>>> times; aborting job >>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming >>>>>>>>>> job 1437032118000 ms.0 >>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>>>>>> Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task >>>>>>>>>> 4.3 in stage 4.0 (TID 16, slave05.local): >>>>>>>>>> java.lang.IllegalArgumentException >>>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275) >>>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236) >>>>>>>>>> at kafka.message.Message.payload(Message.scala:218) >>>>>>>>>> at >>>>>>>>>> kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) >>>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>> at >>>>>>>>>> org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) >>>>>>>>>> at >>>>>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >>>>>>>>>> at >>>>>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>>>>>>>>> at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>>>>> at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>> >>>>>>>>>> This is happening only when I'm doing a full data processing from >>>>>>>>>> Kafka. If there's no load, when you killed the driver and then >>>>>>>>>> restart, it >>>>>>>>>> resumes the checkpoint as expected without missing data. Did someone >>>>>>>>>> encounters something similar ? How did you solve this ? >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> >>>>>>>>>> Nicolas PHUNG >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >