Gordon, Tony,

Thought I would chime in real quick as I have tested this a few different ways 
in the last month (not sure if this will be helpful but thought I’d throw it 
out there). I actually haven’t noticed issue auto committing with any of those 
configs using Kafka property auto.offset.reset instead of using those methods. 
However, I have come across one interesting scenario - even when Checkpointing 
is disabled BUT if App is started from a Savepoint, auto commit doesn’t seem to 
work. I am not sure if Tony has the same scenario. I assumed that starting from 
Savepoint sort of expects Checkpointing to be enabled to commit offsets similar 
to how it behaves when Checkpointing is enabled. At this point, I am generating 
a  random UID for my Kafka consumer (as I really don’t want to enable 
checkpointing — not really needed in my use case and want to save on some 
resources) but I do have some really slow moving states which I’d like save on 
app shutdown etc.

Thanks, Ashish

> On Nov 15, 2017, at 4:22 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
> 
> Hi Tony,
> 
> Thanks for the report. At first glance of the description, what you described 
> doesn’t seem to match the expected behavior.
> I’ll spend some time later today to check this out.
> 
> Cheers,
> Gordon
> 
> 
> On 15 November 2017 at 5:08:34 PM, Tony Wei (tony19920...@gmail.com 
> <mailto:tony19920...@gmail.com>) wrote:
> 
>> Hi Gordon,
>> 
>> When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that if 
>> I used `setStartFromLatest()` the kafka consumer api didn't auto commit 
>> offsets back to consumer group, but if I used `setStartFromGroupOffsets()` 
>> it worked fine.
>> 
>> I am sure that the configuration for Kafka has `auto.commit.interval.ms 
>> <http://auto.commit.interval.ms/> = 5000` and `enable.auto.commit = true` 
>> and I didn't enable checkpointing.
>> 
>> All the difference is only the change from `setStartFromGroupOffsets()` to 
>> `setStartFromLatest()`, but the auto commit mechanism just stopped working.
>> 
>> My Flink cluster version is 1.3.2.
>> My Kafka cluster version is 0.10.2.1.
>> My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09 
>> GMT.
>> My Kafka connector library is "org.apache.flink" % 
>> "flink-connector-kafka-0.10_2.10" % "1.3.2"
>> 
>> Thanks for your help in advance.
>> 
>> Best Regards,
>> Tony Wei

Reply via email to