Hi Cody,

Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand how
to use the messageHandler parameter/function in the createDirectStream
method. You are referring to this, aren't you ?

def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag
, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext,
kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { new
DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets,
messageHandler) }

So, I must supply the fromOffsets parameter too, but how do I tell this
method to read from the beginning of my topic ?

If I have a filter (e.g. a R.date field) on my R class, I can put a filter
in the messageHandler function too ?

Regards,
Nicolas P.

On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Yeah, in the function you supply for the messageHandler parameter to
> createDirectStream, catch the exception and do whatever makes sense for
> your application.
>
> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <nicolas.ph...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Using the old Spark Streaming Kafka API, I got the following around the
>> same offset:
>>
>> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
>> 3561357254, computed crc = 171652633)
>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>         at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>         at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>         at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>         at
>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
>> java.lang.IllegalStateException: Iterator is in failed state
>>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>         at
>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> I found some old topic about some possible corrupt Kafka message produced
>> by the new producer API with Snappy compression on. My question is, is it
>> possible to skip/ignore those offsets when full processing with
>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>
>> Regards,
>> Nicolas PHUNG
>>
>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> I'd try logging the offsets for each message, see where problems start,
>>> then try using the console consumer starting at those offsets and see if
>>> you can reproduce the problem.
>>>
>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <nicolas.ph...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> Thanks for you help. It seems there's something wrong with some
>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>> bigger or incomplete message since I use default configuration to accept
>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>> suggestions, please tell me.
>>>>
>>>> Regards,
>>>> Nicolas PHUNG
>>>>
>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Not exactly the same issue, but possibly related:
>>>>>
>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>
>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Well, working backwards down the stack trace...
>>>>>>
>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>
>>>>>> That exception gets thrown if the limit is negative or greater than the 
>>>>>> buffer's capacity
>>>>>>
>>>>>>
>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>
>>>>>> If size had been negative, it would have just returned null, so we
>>>>>> know the exception got thrown because the size was greater than the
>>>>>> buffer's capacity
>>>>>>
>>>>>>
>>>>>> I haven't seen that before... maybe a corrupted message of some kind?
>>>>>>
>>>>>> If that problem is reproducible, try providing an explicit argument
>>>>>> for messageHandler, with a function that logs the message offset.
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>> nicolas.ph...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new 
>>>>>>> Spark Streaming Kafka method createDirectStream, everything is fine 
>>>>>>> till a driver error happened (driver is killed, connection lost...). 
>>>>>>> When the driver pops up again, it resumes the processing with the 
>>>>>>> checkpoint in HDFS. Except, I got this:
>>>>>>>
>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 
>>>>>>> times; aborting job
>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 
>>>>>>> 1437032118000 ms.0
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 
>>>>>>> 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in 
>>>>>>> stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>         at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>         at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>         at kafka.message.Message.payload(Message.scala:218)
>>>>>>>         at 
>>>>>>> kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>         at 
>>>>>>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>         at 
>>>>>>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>         at 
>>>>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>         at 
>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>         at 
>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>         at 
>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>         at 
>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>         at 
>>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>         at 
>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>         at 
>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>         at 
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>         at 
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>         at 
>>>>>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>         at 
>>>>>>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>         at 
>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>         at 
>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>         at 
>>>>>>> org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>         at 
>>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>         at 
>>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>         at 
>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>         at 
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>         at 
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>         at 
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> This is happening only when I'm doing a full data processing from
>>>>>>> Kafka. If there's no load, when you killed the driver and then restart, 
>>>>>>> it
>>>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>>>> encounters something similar ? How did you solve this ?
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Nicolas PHUNG
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to