Re: Back-pressure Status shows OK but records are backed up in kafka

2018-01-08 Thread Jins George
Thank You Ufuk & Shannon. Since my kafka consumer is 
UnboundedKafkaSource from BEAM, not sure if  records-lag-max metrics is 
exposed. Let me research further.


Thanks,
Jins George
On 01/08/2018 10:11 AM, Shannon Carey wrote:

Right, backpressure only measures backpressure on the inside of the Flink job. 
Ie. between Flink tasks.

Therefore, it’s up to you to monitor whether your Flink job is “keeping up” 
with the source stream. If you’re using Kafka, there’s a metric that the 
consumer library makes available. For example, for one of our jobs, in Graphite 
we have a metric that matches:

aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max,
 18, 19)

The “$Job” is a variable which allows you to select the job. You can see that I 
have wildcards on other elements of the path, for example the TaskManager id, 
the operator name, the Task index, etc. Your metric is probably rooted 
somewhere else, but the thing you’re looking for is under 
operator.*.*.KafkaConsumer.records-lag-max.

Flink manages its offsets itself, rather than acting like a “normal” consumer 
which commits offsets to Kafka. However, in the docs I see that 
“setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you 
can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor 
or https://github.com/linkedin/Burrow etc. which polls Kafka itself and 
produces metrics about consumer lag. However, for some reason, I don’t see our 
Flink consumer metrics showing up in our lag monitoring tool or in the Kafka 
command-line tools, so I’m not sure what’s going on there. Maybe it’s because 
Flink doesn’t show up as a consumer group? At first I thought that it might be 
because we’re not setting the “group.id” property, but as it turns out we are 
indeed setting it. In any case, we have to use the job’s metrics, and monitor 
that the job is up, rather than monitoring the offset in Kafka itself.

-Shannon

On 1/8/18, 1:52 AM, "Ufuk Celebi"  wrote:

 Hey Jins,
 
 our current back pressure tracking mechanism does not work with Kafka

 sources. To gather back pressure indicators we sample the main task
 thread of a subtask. For most tasks, this is the thread that emits
 records downstream (e.g. if you have a map function) and everything
 works as expected. In case of the Kafka source though there is a
 separate thread that consumes from Kafka and emits the records.
 Therefore we sample the "wrong" thread and don't observe any
 indicators for back pressure. :-( Unfortunately, this was not taking
 into account when back pressure sampling was implemented.
 
 There is this old issue to track this:

 https://issues.apache.org/jira/browse/FLINK-3456
 
 I'm not aware of any other way to track this situation. Maybe others

 can chime in here...
 
 – Ufuk
 
 
 On Mon, Jan 8, 2018 at 8:16 AM, Jins George  wrote:

 > I have a Beam Pipeline consuming records from Kafka doing some
 > transformations and writing it to Hbase. I faced an issue in which 
records
 > were writing to Hbase at a slower rate than the incoming messages to 
Kafka
 > due to a temporary surge in the incoming traffic.
 >
 > From the flink UI, if I check the back pressure status, it shows OK. I 
have
 > one task which has all the operators including source.
 >
 > Any idea why backpressure indicator would show OK, but messages are 
backed
 > up in Kafka.
 >
 > Is there any other mechanism/metrics by which I can identify this 
situation
 > ?
 >
 > I'm running Flink 1.2/w beam 2.0.
 >
 > Thanks,
 > Jins George
 
 






Re: Back-pressure Status shows OK but records are backed up in kafka

2018-01-08 Thread Shannon Carey
Right, backpressure only measures backpressure on the inside of the Flink job. 
Ie. between Flink tasks.

Therefore, it’s up to you to monitor whether your Flink job is “keeping up” 
with the source stream. If you’re using Kafka, there’s a metric that the 
consumer library makes available. For example, for one of our jobs, in Graphite 
we have a metric that matches:

aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max,
 18, 19)

The “$Job” is a variable which allows you to select the job. You can see that I 
have wildcards on other elements of the path, for example the TaskManager id, 
the operator name, the Task index, etc. Your metric is probably rooted 
somewhere else, but the thing you’re looking for is under 
operator.*.*.KafkaConsumer.records-lag-max.

Flink manages its offsets itself, rather than acting like a “normal” consumer 
which commits offsets to Kafka. However, in the docs I see that 
“setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you 
can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor 
or https://github.com/linkedin/Burrow etc. which polls Kafka itself and 
produces metrics about consumer lag. However, for some reason, I don’t see our 
Flink consumer metrics showing up in our lag monitoring tool or in the Kafka 
command-line tools, so I’m not sure what’s going on there. Maybe it’s because 
Flink doesn’t show up as a consumer group? At first I thought that it might be 
because we’re not setting the “group.id” property, but as it turns out we are 
indeed setting it. In any case, we have to use the job’s metrics, and monitor 
that the job is up, rather than monitoring the offset in Kafka itself. 

-Shannon

On 1/8/18, 1:52 AM, "Ufuk Celebi"  wrote:

Hey Jins,

our current back pressure tracking mechanism does not work with Kafka
sources. To gather back pressure indicators we sample the main task
thread of a subtask. For most tasks, this is the thread that emits
records downstream (e.g. if you have a map function) and everything
works as expected. In case of the Kafka source though there is a
separate thread that consumes from Kafka and emits the records.
Therefore we sample the "wrong" thread and don't observe any
indicators for back pressure. :-( Unfortunately, this was not taking
into account when back pressure sampling was implemented.

There is this old issue to track this:
https://issues.apache.org/jira/browse/FLINK-3456

I'm not aware of any other way to track this situation. Maybe others
can chime in here...

– Ufuk


On Mon, Jan 8, 2018 at 8:16 AM, Jins George  wrote:
> I have a Beam Pipeline consuming records from Kafka doing some
> transformations and writing it to Hbase. I faced an issue in which records
> were writing to Hbase at a slower rate than the incoming messages to Kafka
> due to a temporary surge in the incoming traffic.
>
> From the flink UI, if I check the back pressure status, it shows OK. I 
have
> one task which has all the operators including source.
>
> Any idea why backpressure indicator would show OK, but messages are backed
> up in Kafka.
>
> Is there any other mechanism/metrics by which I can identify this 
situation
> ?
>
> I'm running Flink 1.2/w beam 2.0.
>
> Thanks,
> Jins George





Re: Back-pressure Status shows OK but records are backed up in kafka

2018-01-07 Thread Ufuk Celebi
Hey Jins,

our current back pressure tracking mechanism does not work with Kafka
sources. To gather back pressure indicators we sample the main task
thread of a subtask. For most tasks, this is the thread that emits
records downstream (e.g. if you have a map function) and everything
works as expected. In case of the Kafka source though there is a
separate thread that consumes from Kafka and emits the records.
Therefore we sample the "wrong" thread and don't observe any
indicators for back pressure. :-( Unfortunately, this was not taking
into account when back pressure sampling was implemented.

There is this old issue to track this:
https://issues.apache.org/jira/browse/FLINK-3456

I'm not aware of any other way to track this situation. Maybe others
can chime in here...

– Ufuk


On Mon, Jan 8, 2018 at 8:16 AM, Jins George  wrote:
> I have a Beam Pipeline consuming records from Kafka doing some
> transformations and writing it to Hbase. I faced an issue in which records
> were writing to Hbase at a slower rate than the incoming messages to Kafka
> due to a temporary surge in the incoming traffic.
>
> From the flink UI, if I check the back pressure status, it shows OK. I have
> one task which has all the operators including source.
>
> Any idea why backpressure indicator would show OK, but messages are backed
> up in Kafka.
>
> Is there any other mechanism/metrics by which I can identify this situation
> ?
>
> I'm running Flink 1.2/w beam 2.0.
>
> Thanks,
> Jins George


Back-pressure Status shows OK but records are backed up in kafka

2018-01-07 Thread Jins George
I have a Beam Pipeline consuming records from Kafka doing some 
transformations and writing it to Hbase. I faced an issue in which 
records were writing to Hbase at a slower rate than the incoming 
messages to Kafka due to a temporary surge in the incoming traffic.


From the flink UI, if I check the back pressure status, it shows OK. I 
have one task which has all the operators including source.


Any idea why backpressure indicator would show OK, but messages are 
backed up in Kafka.


Is there any other mechanism/metrics by which I can identify this 
situation ?


I'm running Flink 1.2/w beam 2.0.

Thanks,
Jins George