KafkaRDD.scala , handleFetchErr On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <swethakasire...@gmail.com> wrote:
> Hi Cody, > > How to look at Option 2(see the following)? Which portion of the code in > Spark Kafka Direct to look at to handle this issue specific to our > requirements. > > > 2.Catch that exception and somehow force things to "reset" for that > partition And how would it handle the offsets already calculated in the > backlog (if there is one)? > > On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> wrote: > >> If you're consistently getting offset out of range exceptions, it's >> probably because messages are getting deleted before you've processed them. >> >> The only real way to deal with this is give kafka more retention, consume >> faster, or both. >> >> If you're just looking for a quick "fix" for an infrequent issue, option >> 4 is probably easiest. I wouldn't do that automatically / silently, >> because you're losing data. >> >> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote: >> >>> Hi, >>> >>> So, our Streaming Job fails with the following errors. If you see the >>> errors >>> below, they are all related to Kafka losing offsets and >>> OffsetOutOfRangeException. >>> >>> What are the options we have other than fixing Kafka? We would like to do >>> something like the following. How can we achieve 1 and 2 with Spark Kafka >>> Direct? >>> >>> 1.Need to see a way to skip some offsets if they are not available after >>> the >>> max retries are reached..in that case there might be data loss. >>> >>> 2.Catch that exception and somehow force things to "reset" for that >>> partition And how would it handle the offsets already calculated in the >>> backlog (if there is one)? >>> >>> 3.Track the offsets separately, restart the job by providing the offsets. >>> >>> 4.Or a straightforward approach would be to monitor the log for this >>> error, >>> and if it occurs more than X times, kill the job, remove the checkpoint >>> directory, and restart. >>> >>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException, >>> org.apache.spark.SparkException: Couldn't find leader offsets for >>> Set([test_stream,5])) >>> >>> >>> >>> java.lang.ClassNotFoundException: >>> kafka.common.NotLeaderForPartitionException >>> >>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>> >>> >>> >>> java.util.concurrent.RejectedExecutionException: Task >>> >>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8 >>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0 >>> [Terminated, >>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = >>> 12112] >>> >>> >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 10 >>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in >>> stage >>> 52.0 (TID 255, 172.16.97.97): UnknownReason >>> >>> Exception in thread "streaming-job-executor-0" java.lang.Error: >>> java.lang.InterruptedException >>> >>> Caused by: java.lang.InterruptedException >>> >>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException >>> >>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>> >>> >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 7 in >>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage >>> 33.0 >>> (TID 283, 172.16.97.103): UnknownReason >>> >>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException >>> >>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>> >>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException >>> >>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >