Hi, Julio: Is checkpoint enabled in your job? Flink kafka connector only commits offsets when checkpoint is enabled.
On Tue, Sep 4, 2018 at 11:43 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Julio, > > As Renjie had already mentioned, to achieve exactly-once semantics with > the Kafka consumer, Flink needs to have control over the Kafka partition to > source subtask assignment. > > To add a bit more detail here, this is due to the fact that each subtask > writes to Flink managed state the current offsets of partitions that it is > assigned, and that is coordinated with Fink’s checkpoints. > If it were to use Kafka’s automatic consumer group assignments (i.e. when > using the subscribe API), the consumer would have no control over when > exactly partition subscriptions are reassigned across subtasks. > If I understood correctly, what you were suggesting in your last reply was > to simply use poll() to query the offset in the case that some partition > was reassigned to another source subtask. > This is problematic because there is no consistency guarantees between the > committed offsets in Kafka and Fink’s checkpoints. > Committing of offsets are and should only be used as a means to expose > consumer progress to the outside world beyond the Flink job. > > Hope this provides a bit more insight. > > Cheers, > Gordon > > > On 4 September 2018 at 2:25:38 PM, Julio Biason (julio.bia...@azion.com) > wrote: > > Hi Renjie, > > 1. For what I could grasp from Kafka docs, you can subscribe and still use > poll() to capture a specific offset. But I just read the starting point of > it and didn't go deep into it. > > 2. Currently, Flink 1.4.2, Kafka 0.10.1 and the FlinkKafkaConsumer010. > > On Tue, Sep 4, 2018 at 12:47 AM, Renjie Liu <liurenjie2...@gmail.com> > wrote: > >> Hi, Julio: >> 1. Flink doesn't use subscribe because it needs to control partition >> assignment itself, which is important for implementing exactly once. >> 2. Can you share the versions you are using, including kafka, kafka >> client, flink? We are also use flink kafka consumer and we can monitor it >> correctly. >> >> On Tue, Sep 4, 2018 at 3:09 AM Julio Biason <julio.bia...@azion.com> >> wrote: >> >>> Hey guys, >>> >>> We are trying to add external monitoring to our system, but we can only >>> get the lag in kafka topics while the Flink job is running -- if, for some >>> reason, the Flink job fails, we get no visibility on how big the lag is. >>> >>> (Besides that, the way Flink reports is not accurate and produces a lot >>> of -Inf, which I already discussed before.) >>> >>> While looking at the problem, we noticed that the FlinkKafkaConsumer >>> never uses `subscribe` to subscribe to the topics and that's why the values >>> are never stored back into Kafka, even when the driver itself does >>> `commitAsync`. >>> >>> Is there any reason for not subscribing to topics that I may have missed? >>> >>> -- >>> *Julio Biason*, Sofware Engineer >>> *AZION* | Deliver. Accelerate. Protect. >>> Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 >>> <callto:+5551996209291>*99907 0554* >>> >> -- >> Liu, Renjie >> Software Engineer, MVAD >> > > > > -- > *Julio Biason*, Sofware Engineer > *AZION* | Deliver. Accelerate. Protect. > Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 > <callto:+5551996209291>*99907 0554* > > -- Liu, Renjie Software Engineer, MVAD