Re: Flink metrics related problems/questions

2017-05-22 Thread Aljoscha Krettek
Ah ok, the onTimer() and processElement() methods are all protected by 
synchronized blocks on the same lock. So that shouldn’t be a problem.

> On 22. May 2017, at 15:08, Chesnay Schepler <ches...@apache.org> wrote:
> 
> Yes, that could cause the observed issue.
> 
> The default implementations are not thread-safe; if you do concurrent writes 
> they may be lost/overwritten.
> You will have to either guard accesses to that metric with a synchronized 
> block or implement your own thread-safe counter.
> 
> On 22.05.2017 14:17, Aljoscha Krettek wrote:
>> @Chesnay With timers it will happen that onTimer() is called from a 
>> different Thread than the Tread that is calling processElement(). If Metrics 
>> updates happen in both, would that be a problem?
>> 
>>> On 19. May 2017, at 11:57, Chesnay Schepler <ches...@apache.org> wrote:
>>> 
>>> 2. isn't quite accurate actually; metrics on the TaskManager are not 
>>> persisted across restarts.
>>> 
>>> On 19.05.2017 11:21, Chesnay Schepler wrote:
>>>> 1. This shouldn't happen. Do you access the counter from different threads?
>>>> 
>>>> 2. Metrics in general are not persisted across restarts, and there is no 
>>>> way to configure flink to do so at the moment.
>>>> 
>>>> 3. Counters are sent as gauges since as far as I know StatsD counters are 
>>>> not allowed to be decremented.
>>>> 
>>>> On 19.05.2017 08:56, jaxbihani wrote:
>>>>> Background: We are using a job using ProcessFunction which reads data from
>>>>> kafka fires ~5-10K timers per second and sends matched events to 
>>>>> KafkaSink.
>>>>> We are collecting metrics for collecting no of active timers, no of timers
>>>>> scheduled etc. We use statsd reporter and monitor using Grafana dashboard 
>>>>> &
>>>>> RocksDBStateBackend backed by HDFS as state.
>>>>> 
>>>>> Observations/Problems:
>>>>> 1. *Counter value suddenly got reset:*  While job was running fine, on one
>>>>> fine moment, metric of a monotonically increasing counter (Counter where 
>>>>> we
>>>>> just used inc() operation) suddenly became 0 and then resumed from there
>>>>> onwards. Only exception in the logs were related to transient connectivity
>>>>> issues to datanodes. Also there was no other indicator of any failure
>>>>> observed after inspecting system metrics/checkpoint metrics.  It happened
>>>>> just once across multiple runs of a same job.
>>>>> 2. *Counters not retained during flink restart with savepoint*: Cancelled
>>>>> job with -s option taking savepoint and then restarted the job using the
>>>>> savepoint.  After restart metrics started from 0. I was expecting metric
>>>>> value of a given operator would also be part of state.
>>>>> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
>>>>> the format in which metric are sent to statsd. I observed that even the
>>>>> metric which in my code were counters, were sent as gauges. I didn't get 
>>>>> why
>>>>> that was so.
>>>>> 
>>>>> Can anyone please add more insights into why above mentioned behaviors 
>>>>> would
>>>>> have happened?
>>>>> Also does flink store metric values as a part of state for stateful
>>>>> operators? Is there any way to configure that?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> View this message in context: 
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list 
>>>>> archive at Nabble.com.
>>>>> 
>>>> 
>> 
> 



Re: Flink metrics related problems/questions

2017-05-22 Thread Chesnay Schepler

Yes, that could cause the observed issue.

The default implementations are not thread-safe; if you do concurrent 
writes they may be lost/overwritten.
You will have to either guard accesses to that metric with a 
synchronized block or implement your own thread-safe counter.


On 22.05.2017 14:17, Aljoscha Krettek wrote:

@Chesnay With timers it will happen that onTimer() is called from a different 
Thread than the Tread that is calling processElement(). If Metrics updates 
happen in both, would that be a problem?


On 19. May 2017, at 11:57, Chesnay Schepler <ches...@apache.org> wrote:

2. isn't quite accurate actually; metrics on the TaskManager are not persisted 
across restarts.

On 19.05.2017 11:21, Chesnay Schepler wrote:

1. This shouldn't happen. Do you access the counter from different threads?

2. Metrics in general are not persisted across restarts, and there is no way to 
configure flink to do so at the moment.

3. Counters are sent as gauges since as far as I know StatsD counters are not 
allowed to be decremented.

On 19.05.2017 08:56, jaxbihani wrote:

Background: We are using a job using ProcessFunction which reads data from
kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
We are collecting metrics for collecting no of active timers, no of timers
scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, on one
fine moment, metric of a monotonically increasing counter (Counter where we
just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient connectivity
issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It happened
just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: Cancelled
job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't get why
that was so.

Can anyone please add more insights into why above mentioned behaviors would
have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.









Re: Flink metrics related problems/questions

2017-05-22 Thread Aljoscha Krettek
@Chesnay With timers it will happen that onTimer() is called from a different 
Thread than the Tread that is calling processElement(). If Metrics updates 
happen in both, would that be a problem?

> On 19. May 2017, at 11:57, Chesnay Schepler <ches...@apache.org> wrote:
> 
> 2. isn't quite accurate actually; metrics on the TaskManager are not 
> persisted across restarts.
> 
> On 19.05.2017 11:21, Chesnay Schepler wrote:
>> 1. This shouldn't happen. Do you access the counter from different threads?
>> 
>> 2. Metrics in general are not persisted across restarts, and there is no way 
>> to configure flink to do so at the moment.
>> 
>> 3. Counters are sent as gauges since as far as I know StatsD counters are 
>> not allowed to be decremented.
>> 
>> On 19.05.2017 08:56, jaxbihani wrote:
>>> Background: We are using a job using ProcessFunction which reads data from
>>> kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
>>> We are collecting metrics for collecting no of active timers, no of timers
>>> scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
>>> RocksDBStateBackend backed by HDFS as state.
>>> 
>>> Observations/Problems:
>>> 1. *Counter value suddenly got reset:*  While job was running fine, on one
>>> fine moment, metric of a monotonically increasing counter (Counter where we
>>> just used inc() operation) suddenly became 0 and then resumed from there
>>> onwards. Only exception in the logs were related to transient connectivity
>>> issues to datanodes. Also there was no other indicator of any failure
>>> observed after inspecting system metrics/checkpoint metrics.  It happened
>>> just once across multiple runs of a same job.
>>> 2. *Counters not retained during flink restart with savepoint*: Cancelled
>>> job with -s option taking savepoint and then restarted the job using the
>>> savepoint.  After restart metrics started from 0. I was expecting metric
>>> value of a given operator would also be part of state.
>>> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
>>> the format in which metric are sent to statsd. I observed that even the
>>> metric which in my code were counters, were sent as gauges. I didn't get why
>>> that was so.
>>> 
>>> Can anyone please add more insights into why above mentioned behaviors would
>>> have happened?
>>> Also does flink store metric values as a part of state for stateful
>>> operators? Is there any way to configure that?
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com.
>>> 
>> 
>> 
> 



Re: Flink metrics related problems/questions

2017-05-19 Thread Chesnay Schepler
2. isn't quite accurate actually; metrics on the TaskManager are not 
persisted across restarts.


On 19.05.2017 11:21, Chesnay Schepler wrote:
1. This shouldn't happen. Do you access the counter from different 
threads?


2. Metrics in general are not persisted across restarts, and there is 
no way to configure flink to do so at the moment.


3. Counters are sent as gauges since as far as I know StatsD counters 
are not allowed to be decremented.


On 19.05.2017 08:56, jaxbihani wrote:
Background: We are using a job using ProcessFunction which reads data 
from
kafka fires ~5-10K timers per second and sends matched events to 
KafkaSink.
We are collecting metrics for collecting no of active timers, no of 
timers
scheduled etc. We use statsd reporter and monitor using Grafana 
dashboard &

RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, 
on one
fine moment, metric of a monotonically increasing counter (Counter 
where we

just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient 
connectivity

issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It 
happened

just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: 
Cancelled

job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was 
inspecting

the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't 
get why

that was so.

Can anyone please add more insights into why above mentioned 
behaviors would

have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list 
archive at Nabble.com.









Re: Flink metrics related problems/questions

2017-05-19 Thread Chesnay Schepler

1. This shouldn't happen. Do you access the counter from different threads?

2. Metrics in general are not persisted across restarts, and there is no 
way to configure flink to do so at the moment.


3. Counters are sent as gauges since as far as I know StatsD counters 
are not allowed to be decremented.


On 19.05.2017 08:56, jaxbihani wrote:

Background: We are using a job using ProcessFunction which reads data from
kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
We are collecting metrics for collecting no of active timers, no of timers
scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, on one
fine moment, metric of a monotonically increasing counter (Counter where we
just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient connectivity
issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It happened
just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: Cancelled
job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't get why
that was so.

Can anyone please add more insights into why above mentioned behaviors would
have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Flink metrics related problems/questions

2017-05-19 Thread jaxbihani
Background: We are using a job using ProcessFunction which reads data from
kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
We are collecting metrics for collecting no of active timers, no of timers
scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
RocksDBStateBackend backed by HDFS as state. 

Observations/Problems: 
1. *Counter value suddenly got reset:*  While job was running fine, on one
fine moment, metric of a monotonically increasing counter (Counter where we
just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient connectivity
issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It happened
just once across multiple runs of a same job. 
2. *Counters not retained during flink restart with savepoint*: Cancelled
job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't get why
that was so. 

Can anyone please add more insights into why above mentioned behaviors would
have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that? 




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.