Hi Nicolas, >From my experience there are only two ways out: 1) wait for retention time to pass, so data gets deleted (this is usually unacceptable) 2) trace offset of corrupt message on all affected subscriptions and skip this message by overwriting it (offset+1)
Problem is, that when encountering corrupt message, high level consumer iterator goes into invalid state and closes. There is no way to skip this message or recover from it without skipping offsets. You might try to use SimpleConsumer though. Maybe someone knows other ways to deal with this problem, but we haven't found any. BR, Adam 2015-07-21 9:38 GMT+02:00 Nicolas Phung <[email protected]>: > Hello, > > I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message > from my Kafka topic with Spark Streaming, I've got the following error : > > kafka.message.InvalidMessageException: Message is corrupt (stored crc = > 3561357254, computed crc = 171652633) > at kafka.message.Message.ensureValid(Message.scala:166) > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator. > scala:102) > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator. > scala:33) > at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla > te.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa > geHandler.run(ReliableKafkaReceiver.scala:265) > at java.util.concurrent.Executors$RunnableAdapter.call( > Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool > Executor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo > lExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 > 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message > java.lang.IllegalStateException: Iterator is in failed state > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) > at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa > geHandler.run(ReliableKafkaReceiver.scala:265) > at java.util.concurrent.Executors$RunnableAdapter.call( > Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool > Executor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo > lExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > From my understanding, there's some corrupt message in my topic. I'm using > the new Producer API to send message compress with Snappy. I found an old > topic talking about it but with no further step to resolve the issue. Do > you have any informations regarding this ? > > Is it possible in Kafka to somehow reread the topic and drop corrupt > message ? > > Regards, > Nicolas PHUNG >
