Re: Storing offsets in Kafka
Hi jiangjie, Yeah I am using the second case. (Flink 1.7.1, Kafka 0.10.2, FlinkKafkaConsumer010) But now there is a problem, the data is consumed normally, but the commit offset is not continued. The following exception is found: [image: image.png] Becket Qin 于2019年9月5日周四 上午11:32写道: > Hi Dominik, > > There has not been any change to the offset committing logic in > KafkaConsumer for a while. But the logic is a little complicated. The > offset commit to Kafka is only enabled in the following two cases: > > 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true > (default value is true) > 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a) > enable.auto.commit=true (default value is true); b) > auto.commit.interval.ms>0 > (default value is 5000). > > Note that in case 1, if the job exits before the first checkpoint takes > place, then there will be no offset committed. > > Can you check if your setting falls in one of the two cases? > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński wrote: > > > Hey, > > I was wondering whether something has changed for KafkaConsumer, since I > am > > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there > > seems to be no change in the topic where Kafka stores it's offsets, after > > restart Flink uses the `auto.offset.reset` so it seems that there is no > > offsets commit happening. The checkpoints are properly configured and I > am > > able to restore with Savepoint. But the group offsets are not working > > properly. It there anything that has changed in this manner ? > > > > Best Regards, > > Dom. > > >
Re: Storing offsets in Kafka
No, I don't think so. As long as you have a successful checkpoint, The offset will be committed. Thanks, Jiangjie (Becket) Qin On Thu, Sep 5, 2019 at 4:56 PM Dominik Wosiński wrote: > Hey, > Yeah I am using the first case. Is there a specific requirement for > checkpoints ? Like do they need to be externalized or so ? > > > Best, > Dom. > > czw., 5 wrz 2019 o 05:32 Becket Qin napisał(a): > > > Hi Dominik, > > > > There has not been any change to the offset committing logic in > > KafkaConsumer for a while. But the logic is a little complicated. The > > offset commit to Kafka is only enabled in the following two cases: > > > > 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true > > (default value is true) > > 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a) > > enable.auto.commit=true (default value is true); b) > > auto.commit.interval.ms>0 > > (default value is 5000). > > > > Note that in case 1, if the job exits before the first checkpoint takes > > place, then there will be no offset committed. > > > > Can you check if your setting falls in one of the two cases? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński > wrote: > > > > > Hey, > > > I was wondering whether something has changed for KafkaConsumer, since > I > > am > > > using Kafka 2.0.0 with Flink and I wanted to use group offsets but > there > > > seems to be no change in the topic where Kafka stores it's offsets, > after > > > restart Flink uses the `auto.offset.reset` so it seems that there is no > > > offsets commit happening. The checkpoints are properly configured and I > > am > > > able to restore with Savepoint. But the group offsets are not working > > > properly. It there anything that has changed in this manner ? > > > > > > Best Regards, > > > Dom. > > > > > >
Re: Storing offsets in Kafka
Hey, Yeah I am using the first case. Is there a specific requirement for checkpoints ? Like do they need to be externalized or so ? Best, Dom. czw., 5 wrz 2019 o 05:32 Becket Qin napisał(a): > Hi Dominik, > > There has not been any change to the offset committing logic in > KafkaConsumer for a while. But the logic is a little complicated. The > offset commit to Kafka is only enabled in the following two cases: > > 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true > (default value is true) > 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a) > enable.auto.commit=true (default value is true); b) > auto.commit.interval.ms>0 > (default value is 5000). > > Note that in case 1, if the job exits before the first checkpoint takes > place, then there will be no offset committed. > > Can you check if your setting falls in one of the two cases? > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński wrote: > > > Hey, > > I was wondering whether something has changed for KafkaConsumer, since I > am > > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there > > seems to be no change in the topic where Kafka stores it's offsets, after > > restart Flink uses the `auto.offset.reset` so it seems that there is no > > offsets commit happening. The checkpoints are properly configured and I > am > > able to restore with Savepoint. But the group offsets are not working > > properly. It there anything that has changed in this manner ? > > > > Best Regards, > > Dom. > > >
Re: Storing offsets in Kafka
Hi Dominik, There has not been any change to the offset committing logic in KafkaConsumer for a while. But the logic is a little complicated. The offset commit to Kafka is only enabled in the following two cases: 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true (default value is true) 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a) enable.auto.commit=true (default value is true); b) auto.commit.interval.ms>0 (default value is 5000). Note that in case 1, if the job exits before the first checkpoint takes place, then there will be no offset committed. Can you check if your setting falls in one of the two cases? Thanks, Jiangjie (Becket) Qin On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński wrote: > Hey, > I was wondering whether something has changed for KafkaConsumer, since I am > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there > seems to be no change in the topic where Kafka stores it's offsets, after > restart Flink uses the `auto.offset.reset` so it seems that there is no > offsets commit happening. The checkpoints are properly configured and I am > able to restore with Savepoint. But the group offsets are not working > properly. It there anything that has changed in this manner ? > > Best Regards, > Dom. >
Storing offsets in Kafka
Hey, I was wondering whether something has changed for KafkaConsumer, since I am using Kafka 2.0.0 with Flink and I wanted to use group offsets but there seems to be no change in the topic where Kafka stores it's offsets, after restart Flink uses the `auto.offset.reset` so it seems that there is no offsets commit happening. The checkpoints are properly configured and I am able to restore with Savepoint. But the group offsets are not working properly. It there anything that has changed in this manner ? Best Regards, Dom.