If you leave enable.auto.commit set to true, it will commit offsets to kafka, but you will get undefined delivery semantics.
If you just want to restart from a fresh state, the easiest thing to do is use a new consumer group name. But if that keeps happening, you should look into why your retention is not sufficient. On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote: > You are right. I got confused as its all part of same log when running from > IDE. > I was looking for a good guide to read to understand the this integ. > > I'm not managing offset on my own. I've not enabled checkpoint for my tests. > I assumed offsets will be stored in kafka by default. > > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( > ssc, PreferConsistent, SubscribePattern[Array[Byte], > Array[Byte]](pattern, kafkaParams) ) > > * @param offsets: offsets to begin at on initial startup. If no offset > is given for a > * TopicPartition, the committed offset (if applicable) or kafka param > * auto.offset.reset will be used. > > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values: > enable.auto.commit = true > auto.offset.reset = latest > > Srikanth > > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org> wrote: >> >> Seems like you're confused about the purpose of that line of code, it >> applies to executors, not the driver. The driver is responsible for >> determining offsets. >> >> Where are you storing offsets, in Kafka, checkpoints, or your own store? >> Auto offset reset won't be used if there are stored offsets. >> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote: >>> >>> Hi, >>> >>> Upon restarting my Spark Streaming app it is failing with error >>> >>> Exception in thread "main" org.apache.spark.SparkException: Job aborted >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost): >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of >>> range with no configured reset policy for partitions: {mt-event-2=1710706} >>> >>> It is correct that the last read offset was deleted by kafka due to >>> retention period expiry. >>> I've set auto.offset.reset in my app but it is getting reset here >>> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160 >>> >>> How to force it to restart in this case (fully aware of potential data >>> loss)? >>> >>> Srikanth > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org