Right, backpressure only measures backpressure on the inside of the Flink job. 
Ie. between Flink tasks.

Therefore, it’s up to you to monitor whether your Flink job is “keeping up” 
with the source stream. If you’re using Kafka, there’s a metric that the 
consumer library makes available. For example, for one of our jobs, in Graphite 
we have a metric that matches:

aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max,
 18, 19)

The “$Job” is a variable which allows you to select the job. You can see that I 
have wildcards on other elements of the path, for example the TaskManager id, 
the operator name, the Task index, etc. Your metric is probably rooted 
somewhere else, but the thing you’re looking for is under 
operator.*.*.KafkaConsumer.records-lag-max.

Flink manages its offsets itself, rather than acting like a “normal” consumer 
which commits offsets to Kafka. However, in the docs I see that 
“setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you 
can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor 
or https://github.com/linkedin/Burrow etc. which polls Kafka itself and 
produces metrics about consumer lag. However, for some reason, I don’t see our 
Flink consumer metrics showing up in our lag monitoring tool or in the Kafka 
command-line tools, so I’m not sure what’s going on there. Maybe it’s because 
Flink doesn’t show up as a consumer group? At first I thought that it might be 
because we’re not setting the “group.id” property, but as it turns out we are 
indeed setting it. In any case, we have to use the job’s metrics, and monitor 
that the job is up, rather than monitoring the offset in Kafka itself. 

-Shannon

On 1/8/18, 1:52 AM, "Ufuk Celebi" <u...@apache.org> wrote:

    Hey Jins,
    
    our current back pressure tracking mechanism does not work with Kafka
    sources. To gather back pressure indicators we sample the main task
    thread of a subtask. For most tasks, this is the thread that emits
    records downstream (e.g. if you have a map function) and everything
    works as expected. In case of the Kafka source though there is a
    separate thread that consumes from Kafka and emits the records.
    Therefore we sample the "wrong" thread and don't observe any
    indicators for back pressure. :-( Unfortunately, this was not taking
    into account when back pressure sampling was implemented.
    
    There is this old issue to track this:
    https://issues.apache.org/jira/browse/FLINK-3456
    
    I'm not aware of any other way to track this situation. Maybe others
    can chime in here...
    
    – Ufuk
    
    
    On Mon, Jan 8, 2018 at 8:16 AM, Jins George <jins.geo...@aeris.net> wrote:
    > I have a Beam Pipeline consuming records from Kafka doing some
    > transformations and writing it to Hbase. I faced an issue in which records
    > were writing to Hbase at a slower rate than the incoming messages to Kafka
    > due to a temporary surge in the incoming traffic.
    >
    > From the flink UI, if I check the back pressure status, it shows OK. I 
have
    > one task which has all the operators including source.
    >
    > Any idea why backpressure indicator would show OK, but messages are backed
    > up in Kafka.
    >
    > Is there any other mechanism/metrics by which I can identify this 
situation
    > ?
    >
    > I'm running Flink 1.2/w beam 2.0.
    >
    > Thanks,
    > Jins George
    
    

Reply via email to