There are other ways to deal with the problem than shutdown the streaming job. You can monitor the lag in your consumer to see if consumer if falling behind . If lag is too high that offsetOutOfRange can happen, you either increase retention period or increase consumer rate..or do both ..
What I am trying to say, streaming job should not fail in any cases .. Dibyendu On Thu, Dec 3, 2015 at 9:40 AM, Cody Koeninger <c...@koeninger.org> wrote: > I believe that what differentiates reliable systems is individual > components should fail fast when their preconditions aren't met, and other > components should be responsible for monitoring them. > > If a user of the direct stream thinks that your approach of restarting and > ignoring data loss is the right thing to do, they can monitor the job > (which they should be doing in any case) and restart. > > If a user of your library thinks that my approach of failing (so they KNOW > there was data loss and can adjust their system) is the right thing to do, > how do they do that? > > On Wed, Dec 2, 2015 at 9:49 PM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> Well, even if you do correct retention and increase speed, >> OffsetOutOfRange can still come depends on how your downstream processing >> is. And if that happen , there is No Other way to recover old messages . So >> best bet here from Streaming Job point of view is to start from earliest >> offset rather bring down the streaming job . In many cases goal for a >> streaming job is not to shut down and exit in case of any failure. I >> believe that is what differentiate a always running streaming job. >> >> Dibyendu >> >> On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> No, silently restarting from the earliest offset in the case of offset >>> out of range exceptions during a streaming job is not the "correct way of >>> recovery". >>> >>> If you do that, your users will be losing data without knowing why. >>> It's more like a "way of ignoring the problem without actually addressing >>> it". >>> >>> The only really correct way to deal with that situation is to recognize >>> why it's happening, and either increase your Kafka retention or increase >>> the speed at which you are consuming. >>> >>> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya < >>> dibyendu.bhattach...@gmail.com> wrote: >>> >>>> This consumer which I mentioned does not silently throw away data. If >>>> offset out of range it start for earliest offset and that is correct way of >>>> recovery from this error. >>>> >>>> Dibyendu >>>> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote: >>>> >>>>> Again, just to be clear, silently throwing away data because your >>>>> system isn't working right is not the same as "recover from any Kafka >>>>> leader changes and offset out of ranges issue". >>>>> >>>>> >>>>> >>>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya < >>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>> >>>>>> Hi, if you use Receiver based consumer which is available in >>>>>> spark-packages ( >>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , >>>>>> this has all built in failure recovery and it can recover from any Kafka >>>>>> leader changes and offset out of ranges issue. >>>>>> >>>>>> Here is the package form github : >>>>>> https://github.com/dibbhatt/kafka-spark-consumer >>>>>> >>>>>> >>>>>> Dibyendu >>>>>> >>>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy < >>>>>> swethakasire...@gmail.com> wrote: >>>>>> >>>>>>> How to avoid those Errors with receiver based approach? Suppose we >>>>>>> are OK with at least once processing and use receiver based approach >>>>>>> which >>>>>>> uses ZooKeeper but not query Kafka directly, would these errors(Couldn't >>>>>>> find leader offsets for >>>>>>> Set([test_stream,5]))) be avoided? >>>>>>> >>>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> >>>>>>> wrote: >>>>>>> >>>>>>>> KafkaRDD.scala , handleFetchErr >>>>>>>> >>>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy < >>>>>>>> swethakasire...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Cody, >>>>>>>>> >>>>>>>>> How to look at Option 2(see the following)? Which portion of the >>>>>>>>> code in Spark Kafka Direct to look at to handle this issue specific >>>>>>>>> to our >>>>>>>>> requirements. >>>>>>>>> >>>>>>>>> >>>>>>>>> 2.Catch that exception and somehow force things to "reset" for that >>>>>>>>> partition And how would it handle the offsets already calculated >>>>>>>>> in the >>>>>>>>> backlog (if there is one)? >>>>>>>>> >>>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> If you're consistently getting offset out of range exceptions, >>>>>>>>>> it's probably because messages are getting deleted before you've >>>>>>>>>> processed >>>>>>>>>> them. >>>>>>>>>> >>>>>>>>>> The only real way to deal with this is give kafka more retention, >>>>>>>>>> consume faster, or both. >>>>>>>>>> >>>>>>>>>> If you're just looking for a quick "fix" for an infrequent issue, >>>>>>>>>> option 4 is probably easiest. I wouldn't do that automatically / >>>>>>>>>> silently, >>>>>>>>>> because you're losing data. >>>>>>>>>> >>>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> So, our Streaming Job fails with the following errors. If you >>>>>>>>>>> see the errors >>>>>>>>>>> below, they are all related to Kafka losing offsets and >>>>>>>>>>> OffsetOutOfRangeException. >>>>>>>>>>> >>>>>>>>>>> What are the options we have other than fixing Kafka? We would >>>>>>>>>>> like to do >>>>>>>>>>> something like the following. How can we achieve 1 and 2 with >>>>>>>>>>> Spark Kafka >>>>>>>>>>> Direct? >>>>>>>>>>> >>>>>>>>>>> 1.Need to see a way to skip some offsets if they are not >>>>>>>>>>> available after the >>>>>>>>>>> max retries are reached..in that case there might be data loss. >>>>>>>>>>> >>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for >>>>>>>>>>> that >>>>>>>>>>> partition And how would it handle the offsets already calculated >>>>>>>>>>> in the >>>>>>>>>>> backlog (if there is one)? >>>>>>>>>>> >>>>>>>>>>> 3.Track the offsets separately, restart the job by providing the >>>>>>>>>>> offsets. >>>>>>>>>>> >>>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for >>>>>>>>>>> this error, >>>>>>>>>>> and if it occurs more than X times, kill the job, remove the >>>>>>>>>>> checkpoint >>>>>>>>>>> directory, and restart. >>>>>>>>>>> >>>>>>>>>>> ERROR DirectKafkaInputDStream: >>>>>>>>>>> ArrayBuffer(kafka.common.UnknownException, >>>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>>>>>>>> Set([test_stream,5])) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>>>> kafka.common.NotLeaderForPartitionException >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8 >>>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0 >>>>>>>>>>> [Terminated, >>>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed >>>>>>>>>>> tasks = >>>>>>>>>>> 12112] >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>> failure: Task 10 >>>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task >>>>>>>>>>> 10.3 in stage >>>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason >>>>>>>>>>> >>>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error: >>>>>>>>>>> java.lang.InterruptedException >>>>>>>>>>> >>>>>>>>>>> Caused by: java.lang.InterruptedException >>>>>>>>>>> >>>>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>> failure: Task 7 in >>>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in >>>>>>>>>>> stage 33.0 >>>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason >>>>>>>>>>> >>>>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>>>> >>>>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> View this message in context: >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html >>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>> Nabble.com. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >