Gordon, Tony, Thought I would chime in real quick as I have tested this a few different ways in the last month (not sure if this will be helpful but thought I’d throw it out there). I actually haven’t noticed issue auto committing with any of those configs using Kafka property auto.offset.reset instead of using those methods. However, I have come across one interesting scenario - even when Checkpointing is disabled BUT if App is started from a Savepoint, auto commit doesn’t seem to work. I am not sure if Tony has the same scenario. I assumed that starting from Savepoint sort of expects Checkpointing to be enabled to commit offsets similar to how it behaves when Checkpointing is enabled. At this point, I am generating a random UID for my Kafka consumer (as I really don’t want to enable checkpointing — not really needed in my use case and want to save on some resources) but I do have some really slow moving states which I’d like save on app shutdown etc.
Thanks, Ashish > On Nov 15, 2017, at 4:22 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > > Hi Tony, > > Thanks for the report. At first glance of the description, what you described > doesn’t seem to match the expected behavior. > I’ll spend some time later today to check this out. > > Cheers, > Gordon > > > On 15 November 2017 at 5:08:34 PM, Tony Wei (tony19920...@gmail.com > <mailto:tony19920...@gmail.com>) wrote: > >> Hi Gordon, >> >> When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that if >> I used `setStartFromLatest()` the kafka consumer api didn't auto commit >> offsets back to consumer group, but if I used `setStartFromGroupOffsets()` >> it worked fine. >> >> I am sure that the configuration for Kafka has `auto.commit.interval.ms >> <http://auto.commit.interval.ms/> = 5000` and `enable.auto.commit = true` >> and I didn't enable checkpointing. >> >> All the difference is only the change from `setStartFromGroupOffsets()` to >> `setStartFromLatest()`, but the auto commit mechanism just stopped working. >> >> My Flink cluster version is 1.3.2. >> My Kafka cluster version is 0.10.2.1. >> My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09 >> GMT. >> My Kafka connector library is "org.apache.flink" % >> "flink-connector-kafka-0.10_2.10" % "1.3.2" >> >> Thanks for your help in advance. >> >> Best Regards, >> Tony Wei