Hi,

I was thinking somewhat similar to what Ufuk suggested,
but if we want to report a “consumer lag” metric, we would
essentially need to request the latest offset on every record fetch
(because the latest offset advances as well), so I wasn’t so sure
of the performance tradeoffs there (the partition metadata request
and records requests require 2 separate calls, so we would
basically be doubling the requests calls to Kafka just for this).

If we just want a metric that can show whether or not the
consumer has caught up with the “latest offset at the time the
consumer starts”, it would definitely be feasible. I wonder
how we want to name this metric though.
@Gyula @Florian @Bruno do you think this is enough for your needs?

- Gordon

On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (u...@apache.org) wrote:

@Gordon: What's your take on integrating this directly into the  
consumer? Can't we poll the latest offset wie the Offset API [1] and  
report a consumer lag metric for the consumer group of the  
application? This we could also display in the web frontend.  

In the first version, users would have to poll this metric manually.  

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest
  

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <brunoara...@gmail.com> wrote:  
> Hi,  
>  
> We are interested on this too. So far we flag the records with timestamps in  
> different points of the pipeline and use metrics gauges to measure latency  
> between the different components, but would be good to know if there is  
> something more specific to Kafka that we can do out of the box in Flink.  
>  
> Cheers,  
>  
> Bruno  
>  
> On Fri, 17 Mar 2017 at 10:07 Florian König <florian.koe...@micardo.com>  
> wrote:  
>>  
>> Hi,  
>>  
>> thank you Gyula for posting that question. I’d also be interested in how  
>> this could be done.  
>>  
>> You mentioned the dependency on the commit frequency. I’m using  
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer 
>>  
>> a job's offsets as shown in the diagrams updated a lot more regularly than  
>> the checkpointing interval. With the 10 consumer a commit is only made after 
>>  
>> a successful checkpoint (or so it seems).  
>>  
>> Why is that so? The checkpoint contains the Kafka offset and would be able  
>> to start reading wherever it left off, regardless of any offset stored in  
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently 
>>  
>> from the checkpointing? Or did I misconfigure anything?  
>>  
>> Thanks  
>> Florian  
>>  
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gyf...@apache.org>:  
>> >  
>> > Hi All,  
>> >  
>> > I am wondering if anyone has some nice suggestions on what would be the  
>> > simplest/best way of telling if a job is caught up with the Kafka input.  
>> > An alternative question would be how to tell if a job is caught up to  
>> > another job reading from the same topic.  
>> >  
>> > The first thing that comes to my mind is looking at the offsets Flink  
>> > commits to Kafka. However this will only work if every job uses a 
>> > different  
>> > group id and even then it is not very reliable depending on the commit  
>> > frequency.  
>> >  
>> > The use case I am trying to solve is fault tolerant update of a job, by  
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting 
>> > until  
>> > it catches up and then killing job1.  
>> >  
>> > Thanks for your input!  
>> > Gyula  
>>  
>>  
>  

Reply via email to