Hi Cody, Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand how to use the messageHandler parameter/function in the createDirectStream method. You are referring to this, aren't you ?
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag , VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, messageHandler) } So, I must supply the fromOffsets parameter too, but how do I tell this method to read from the beginning of my topic ? If I have a filter (e.g. a R.date field) on my R class, I can put a filter in the messageHandler function too ? Regards, Nicolas P. On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> wrote: > Yeah, in the function you supply for the messageHandler parameter to > createDirectStream, catch the exception and do whatever makes sense for > your application. > > On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <nicolas.ph...@gmail.com> > wrote: > >> Hello, >> >> Using the old Spark Streaming Kafka API, I got the following around the >> same offset: >> >> kafka.message.InvalidMessageException: Message is corrupt (stored crc = >> 3561357254, computed crc = 171652633) >> at kafka.message.Message.ensureValid(Message.scala:166) >> at >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102) >> at >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) >> at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) >> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) >> at >> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 >> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message >> java.lang.IllegalStateException: Iterator is in failed state >> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) >> at >> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> I found some old topic about some possible corrupt Kafka message produced >> by the new producer API with Snappy compression on. My question is, is it >> possible to skip/ignore those offsets when full processing with >> KafkaUtils.createStream or KafkaUtils.createDirectStream ? >> >> Regards, >> Nicolas PHUNG >> >> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> I'd try logging the offsets for each message, see where problems start, >>> then try using the console consumer starting at those offsets and see if >>> you can reproduce the problem. >>> >>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <nicolas.ph...@gmail.com> >>> wrote: >>> >>>> Hi Cody, >>>> >>>> Thanks for you help. It seems there's something wrong with some >>>> messages within my Kafka topics then. I don't understand how, I can get >>>> bigger or incomplete message since I use default configuration to accept >>>> only 1Mb message in my Kafka topic. If you have any others informations or >>>> suggestions, please tell me. >>>> >>>> Regards, >>>> Nicolas PHUNG >>>> >>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> Not exactly the same issue, but possibly related: >>>>> >>>>> https://issues.apache.org/jira/browse/KAFKA-1196 >>>>> >>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> Well, working backwards down the stack trace... >>>>>> >>>>>> at java.nio.Buffer.limit(Buffer.java:275) >>>>>> >>>>>> That exception gets thrown if the limit is negative or greater than the >>>>>> buffer's capacity >>>>>> >>>>>> >>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236) >>>>>> >>>>>> If size had been negative, it would have just returned null, so we >>>>>> know the exception got thrown because the size was greater than the >>>>>> buffer's capacity >>>>>> >>>>>> >>>>>> I haven't seen that before... maybe a corrupted message of some kind? >>>>>> >>>>>> If that problem is reproducible, try providing an explicit argument >>>>>> for messageHandler, with a function that logs the message offset. >>>>>> >>>>>> >>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung < >>>>>> nicolas.ph...@gmail.com> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new >>>>>>> Spark Streaming Kafka method createDirectStream, everything is fine >>>>>>> till a driver error happened (driver is killed, connection lost...). >>>>>>> When the driver pops up again, it resumes the processing with the >>>>>>> checkpoint in HDFS. Except, I got this: >>>>>>> >>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 >>>>>>> times; aborting job >>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job >>>>>>> 1437032118000 ms.0 >>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>>>>>> 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in >>>>>>> stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException >>>>>>> at java.nio.Buffer.limit(Buffer.java:275) >>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236) >>>>>>> at kafka.message.Message.payload(Message.scala:218) >>>>>>> at >>>>>>> kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) >>>>>>> at >>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) >>>>>>> at >>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) >>>>>>> at >>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) >>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) >>>>>>> at >>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>>>> at >>>>>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) >>>>>>> at >>>>>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) >>>>>>> at >>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >>>>>>> at >>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> This is happening only when I'm doing a full data processing from >>>>>>> Kafka. If there's no load, when you killed the driver and then restart, >>>>>>> it >>>>>>> resumes the checkpoint as expected without missing data. Did someone >>>>>>> encounters something similar ? How did you solve this ? >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Nicolas PHUNG >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >