This isn't a production setup. We kept retention low intentionally.
My original question was why I got the exception instead of it using
auto.offset.reset
on restart?




On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org> wrote:

> If you leave enable.auto.commit set to true, it will commit offsets to
> kafka, but you will get undefined delivery semantics.
>
> If you just want to restart from a fresh state, the easiest thing to
> do is use a new consumer group name.
>
> But if that keeps happening, you should look into why your retention
> is not sufficient.
>
> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote:
> > You are right. I got confused as its all part of same log when running
> from
> > IDE.
> > I was looking for a good guide to read to understand the this integ.
> >
> > I'm not managing offset on my own. I've not enabled checkpoint for my
> tests.
> > I assumed offsets will be stored in kafka by default.
> >
> >     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> >         ssc, PreferConsistent, SubscribePattern[Array[Byte],
> > Array[Byte]](pattern, kafkaParams) )
> >
> >    * @param offsets: offsets to begin at on initial startup.  If no
> offset
> > is given for a
> >    * TopicPartition, the committed offset (if applicable) or kafka param
> >    * auto.offset.reset will be used.
> >
> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> > enable.auto.commit = true
> > auto.offset.reset = latest
> >
> > Srikanth
> >
> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Seems like you're confused about the purpose of that line of code, it
> >> applies to executors, not the driver. The driver is responsible for
> >> determining offsets.
> >>
> >> Where are you storing offsets, in Kafka, checkpoints, or your own store?
> >> Auto offset reset won't be used if there are stored offsets.
> >>
> >>
> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> Upon restarting my Spark Streaming app it is failing with error
> >>>
> >>> Exception in thread "main" org.apache.spark.SparkException: Job
> aborted
> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent
> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
> out of
> >>> range with no configured reset policy for partitions:
> {mt-event-2=1710706}
> >>>
> >>> It is correct that the last read offset was deleted by kafka due to
> >>> retention period expiry.
> >>> I've set auto.offset.reset in my app but it is getting reset here
> >>>
> >>> https://github.com/apache/spark/blob/master/external/
> kafka-0-10/src/main/scala/org/apache/spark/streaming/
> kafka010/KafkaUtils.scala#L160
> >>>
> >>> How to force it to restart in this case (fully aware of potential data
> >>> loss)?
> >>>
> >>> Srikanth
> >
> >
>

Reply via email to