Re: instable checkpointing after migration to flink 1.8

2019-09-11 Thread Yun Tang
Hi Bekir

Changing the timer factory from HEAP to ROCKSDB would certainly help reduce 
your JVM heap usage. Since it would use RocksDB to store the timer state, you 
might come across performance regression as we need to poll timers from RocksDB 
instead of JVM heap.

From our experience, 20 million timers per task manager still acts a bit too 
much, could you reduce your window size to reduce the timers per window? By the 
way, timer coalescing [1] might be an idea to reduce timers. (This method could 
only take effect when user register timer currently).

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing

Best
Yun Tang


From: Bekir Oguz 
Sent: Wednesday, September 11, 2019 19:39
To: Yun Tang 
Cc: dev@flink.apache.org ; Stephan Ewen 
; Niels Alebregtse ; 
Vladislav Bakayev 
Subject: Re: instable checkpointing after migration to flink 1.8

Hi Yun,
first of all, this reported problem looks like resolved for 2 days already, 
right after we changed the type and number of our nodes to give more heap to 
task managers and have more task-managers as well.

Previously, our job was configured as parallelism 70, which was distributed to 
14 task-managers (5 slots/tm) each having 9GB heap.
After our config change, the job is now using parallelism 100, distributed to 
20 task-managers (5 slots/tm) each having 18GB heap.

This job has a keyed process function which manages ~400 million records in its 
state, and creating 1 timer per record scheduled to trigger in 6 months to 
check if the record is eligible to be wiped out from state or not. So we have 
400M records + 400M timers.

For user state, we use RocksDB backend with incremental checkpointing enabled 
and using s3 as an external checkpoint location. RocksDB backend is configured 
with predefined FLASH_SSD_OPTIMIZED values.

Before our config change, flink was managing 400M/14=28,5M records + 28,5M 
timers in each task-manager with 9GB heap.
And after the config change, this is 400M/20=20M records + 20M timers in each 
task-manager with 18GB heap.

So, we have less state to manage per task manager, and have more heap. 
Apparently this fixes(!) the problem of long checkpointing durations (15 
minutes) happening occasionally.

Coming back to your points:
1. Snapshot timers are indeed using HEAP which is the default. We can set it to 
ROCKSDB to see if that change has an impact on the end-to-end checkpoint 
duration. Do you think this change will also reduce the heap usage?
2. I have collected and shared those logs under /tmp directory earlier and 
noticed that snapshotting happens very fast, finishing in a second. But what I 
noticed was, compaction kicking in during the snapshotting phase of a long (15 
minutes) checkpoint. But still, the time spent for snapshotting was 1 second. I 
guess compaction has no impact there. And still do not know why it took 15 mins 
to acknowledge for one task slot.

I have another question regarding this problem and our use of timers. Is this a 
good practice to use timers like we do? Does the flink timer service support 
having this many timers? One timer per record, which is 400 million for us.

Looks like our problem is solved for the time being, but may appear again since 
we still do not know the root cause.
About the use of timers: Could you please share your opinion on our timer setup 
and maybe support us on my question on switching timers to use rocksdb instead 
of heap?

Thanks a lot,
Bekir Oguz


On Thu, 5 Sep 2019 at 19:55, Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Bekir

From what I could see, there should be two main factors influencing your time 
of sync execution checkpoint within that task.

  1.  Snapshot timers in heap to S3 [1] (network IO)
  2.  Creating local RocksDB checkpoint on disk [2] (disk IO)

For the first part, unfortunately, there is no log or metrics could detect how 
long it takes.
For the second part, you could login the machine where running that task, and 
find logs of RocksDB (default DB folder is 
{io.tmp.dirs}/flink-io-xxx/job-xxx/db and the log file name is LOG). You could 
check the interval of logs between "Started the snapshot process -- creating 
snapshot in directory" and "Snapshot DONE" to know how long RocksDB takes to 
create checkpoint on local disk.

If we configure "state.backend.rocksdb.timer-service.factory" to "ROCKSDB", we 
could avoid the 1st part of time and this might be a solution to your problem. 
But to be honest, the implementation of timer snapshot code almost stay the 
same for Flink-1.6 and Flink-1.8 and should not be a regression.

[1] 
https://github.com/apache/flink/blob/ba1cd43b6ed42070fd9013159a5b65adb18b3975/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L453
[2] 
https://github.com/apache/flink/blob/ba1cd43b6ed42070fd9013159a5b65adb18b3975/flink-stat

Re: instable checkpointing after migration to flink 1.8

2019-09-05 Thread Yun Tang
Hi Bekir

From what I could see, there should be two main factors influencing your time 
of sync execution checkpoint within that task.

  1.  Snapshot timers in heap to S3 [1] (network IO)
  2.  Creating local RocksDB checkpoint on disk [2] (disk IO)

For the first part, unfortunately, there is no log or metrics could detect how 
long it takes.
For the second part, you could login the machine where running that task, and 
find logs of RocksDB (default DB folder is 
{io.tmp.dirs}/flink-io-xxx/job-xxx/db and the log file name is LOG). You could 
check the interval of logs between "Started the snapshot process -- creating 
snapshot in directory" and "Snapshot DONE" to know how long RocksDB takes to 
create checkpoint on local disk.

If we configure "state.backend.rocksdb.timer-service.factory" to "ROCKSDB", we 
could avoid the 1st part of time and this might be a solution to your problem. 
But to be honest, the implementation of timer snapshot code almost stay the 
same for Flink-1.6 and Flink-1.8 and should not be a regression.

[1] 
https://github.com/apache/flink/blob/ba1cd43b6ed42070fd9013159a5b65adb18b3975/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L453
[2] 
https://github.com/apache/flink/blob/ba1cd43b6ed42070fd9013159a5b65adb18b3975/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L249

Best
Yun Tang

From: Congxian Qiu 
Sent: Thursday, September 5, 2019 10:38
To: Bekir Oguz 
Cc: Stephan Ewen ; dev ; Niels 
Alebregtse ; Vladislav Bakayev 

Subject: Re: instable checkpointing after migration to flink 1.8

Another information from our private emails

there ALWAYS have Kafka AbstractCoordinator logs about lost connection to
Kafka at the same time we have the checkpoints confirmed. Bekir checked the
Kafka broker log, but did not find any interesting things there.

Best,
Congxian


Congxian Qiu  于2019年9月5日周四 上午10:26写道:

> Hi Bekir,
>
> If it is the storage place for timers, for RocksDBStateBackend, timers can
> be stored in Heap or RocksDB[1][2]
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#rocksdb-state-backend-config-options
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年9月4日周三 下午11:38写道:
>
>> Hi Stephan,
>> sorry for late response.
>> We indeed use timers inside a KeyedProcessFunction but the triggers of
>> the timers are kinda evenly distributed, so not causing a firing storm.
>> We have a custom ttl logic which is used by the deletion timer to decide
>> whether delete a record from inmemory state or not.
>> Can you maybe give some links to those changes in the RocksDB?
>>
>> Thanks in advance,
>> Bekir Oguz
>>
>> On Fri, 30 Aug 2019 at 14:56, Stephan Ewen  wrote:
>>
>>> Hi all!
>>>
>>> A thought would be that this has something to do with timers. Does the
>>> task with that behavior use timers (windows, or process function)?
>>>
>>> If that is the case, some theories to check:
>>>   - Could it be a timer firing storm coinciding with a checkpoint?
>>> Currently, that storm synchronously fires, checkpoints cannot preempt that,
>>> which should change in 1.10 with the new mailbox model.
>>>   - Could the timer-async checkpointing changes have something to do
>>> with that? Does some of the usually small "preparation work" (happening
>>> synchronously) lead to an unwanted effect?
>>>   - Are you using TTL for state in that operator?
>>>   - There were some changes made to support timers in RocksDB recently.
>>> Could that have something to do with it?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu 
>>> wrote:
>>>
>>>> CC flink dev mail list
>>>> Update for those who may be interested in this issue, we'are still
>>>> diagnosing this problem currently.
>>>>
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Congxian Qiu  于2019年8月29日周四 下午8:58写道:
>>>>
>>>> > Hi Bekir
>>>> >
>>>> > Currently, from what we have diagnosed, there is some task complete
>>>> its
>>>> > checkpoint too late (maybe 15 mins), but we checked the kafka broker
>>>> log
>>>> > and did not find any interesting things there. could we run another
>>>> job,
>>>&g

Re: instable checkpointing after migration to flink 1.8

2019-09-04 Thread Congxian Qiu
Another information from our private emails

there ALWAYS have Kafka AbstractCoordinator logs about lost connection to
Kafka at the same time we have the checkpoints confirmed. Bekir checked the
Kafka broker log, but did not find any interesting things there.

Best,
Congxian


Congxian Qiu  于2019年9月5日周四 上午10:26写道:

> Hi Bekir,
>
> If it is the storage place for timers, for RocksDBStateBackend, timers can
> be stored in Heap or RocksDB[1][2]
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#rocksdb-state-backend-config-options
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年9月4日周三 下午11:38写道:
>
>> Hi Stephan,
>> sorry for late response.
>> We indeed use timers inside a KeyedProcessFunction but the triggers of
>> the timers are kinda evenly distributed, so not causing a firing storm.
>> We have a custom ttl logic which is used by the deletion timer to decide
>> whether delete a record from inmemory state or not.
>> Can you maybe give some links to those changes in the RocksDB?
>>
>> Thanks in advance,
>> Bekir Oguz
>>
>> On Fri, 30 Aug 2019 at 14:56, Stephan Ewen  wrote:
>>
>>> Hi all!
>>>
>>> A thought would be that this has something to do with timers. Does the
>>> task with that behavior use timers (windows, or process function)?
>>>
>>> If that is the case, some theories to check:
>>>   - Could it be a timer firing storm coinciding with a checkpoint?
>>> Currently, that storm synchronously fires, checkpoints cannot preempt that,
>>> which should change in 1.10 with the new mailbox model.
>>>   - Could the timer-async checkpointing changes have something to do
>>> with that? Does some of the usually small "preparation work" (happening
>>> synchronously) lead to an unwanted effect?
>>>   - Are you using TTL for state in that operator?
>>>   - There were some changes made to support timers in RocksDB recently.
>>> Could that have something to do with it?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu 
>>> wrote:
>>>
 CC flink dev mail list
 Update for those who may be interested in this issue, we'are still
 diagnosing this problem currently.

 Best,
 Congxian


 Congxian Qiu  于2019年8月29日周四 下午8:58写道:

 > Hi Bekir
 >
 > Currently, from what we have diagnosed, there is some task complete
 its
 > checkpoint too late (maybe 15 mins), but we checked the kafka broker
 log
 > and did not find any interesting things there. could we run another
 job,
 > that did not commit offset to kafka, this wants to check if it is the
 > "commit offset to kafka" step consumes too much time.
 >
 > Best,
 > Congxian
 >
 >
 > Bekir Oguz  于2019年8月28日周三 下午4:19写道:
 >
 >> Hi Congxian,
 >> sorry for the late reply, but no progress on this issue yet. I
 checked
 >> also the kafka broker logs, found nothing interesting there.
 >> And we still have 15 min duration checkpoints quite often. Any more
 ideas
 >> on where to look at?
 >>
 >> Regards,
 >> Bekir
 >>
 >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
 >> wrote:
 >>
 >>> Hi Bekir
 >>>
 >>> Do you come back to work now, does there any more findings of this
 >>> problem?
 >>>
 >>> Best,
 >>> Congxian
 >>>
 >>>
 >>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
 >>>
  Hi Congxian,
  Thanks for following up this issue. It is still unresolved and I
 am on
  vacation at the moment.  Hopefully my collegues Niels and Vlad can
 spare
  some time to look into this.
 
  @Niels, @Vlad: do you guys also think that this issue might be
 Kafka
  related? We could also check kafka broker logs at the time of long
  checkpointing.
 
  Thanks,
  Bekir
 
  Verstuurd vanaf mijn iPhone
 
  Op 12 aug. 2019 om 15:18 heeft Congxian Qiu <
 qcx978132...@gmail.com>
  het volgende geschreven:
 
  Hi Bekir
 
  Is there any progress about this problem?
 
  Best,
  Congxian
 
 
  Congxian Qiu  于2019年8月8日周四 下午11:17写道:
 
 > hi Bekir
 > Thanks for the information.
 >
 > - Source's checkpoint was triggered by RPC calls, so it has the
 > "Trigger checkpoint xxx" log,
 > - other task's checkpoint was triggered after received all the
 barrier
 > of upstreams, it didn't log the "Trigger checkpoint XXX" :(
 >
 > Your diagnose makes sense to me, we need to check the Kafka log.
 > I also find out that we always have a log like
 > "org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 Marking
 > the coordinator 

Re: instable checkpointing after migration to flink 1.8

2019-09-04 Thread Congxian Qiu
Hi Bekir,

If it is the storage place for timers, for RocksDBStateBackend, timers can
be stored in Heap or RocksDB[1][2]
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#rocksdb-state-backend-config-options

Best,
Congxian


Bekir Oguz  于2019年9月4日周三 下午11:38写道:

> Hi Stephan,
> sorry for late response.
> We indeed use timers inside a KeyedProcessFunction but the triggers of the
> timers are kinda evenly distributed, so not causing a firing storm.
> We have a custom ttl logic which is used by the deletion timer to decide
> whether delete a record from inmemory state or not.
> Can you maybe give some links to those changes in the RocksDB?
>
> Thanks in advance,
> Bekir Oguz
>
> On Fri, 30 Aug 2019 at 14:56, Stephan Ewen  wrote:
>
>> Hi all!
>>
>> A thought would be that this has something to do with timers. Does the
>> task with that behavior use timers (windows, or process function)?
>>
>> If that is the case, some theories to check:
>>   - Could it be a timer firing storm coinciding with a checkpoint?
>> Currently, that storm synchronously fires, checkpoints cannot preempt that,
>> which should change in 1.10 with the new mailbox model.
>>   - Could the timer-async checkpointing changes have something to do with
>> that? Does some of the usually small "preparation work" (happening
>> synchronously) lead to an unwanted effect?
>>   - Are you using TTL for state in that operator?
>>   - There were some changes made to support timers in RocksDB recently.
>> Could that have something to do with it?
>>
>> Best,
>> Stephan
>>
>>
>> On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu 
>> wrote:
>>
>>> CC flink dev mail list
>>> Update for those who may be interested in this issue, we'are still
>>> diagnosing this problem currently.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Congxian Qiu  于2019年8月29日周四 下午8:58写道:
>>>
>>> > Hi Bekir
>>> >
>>> > Currently, from what we have diagnosed, there is some task complete its
>>> > checkpoint too late (maybe 15 mins), but we checked the kafka broker
>>> log
>>> > and did not find any interesting things there. could we run another
>>> job,
>>> > that did not commit offset to kafka, this wants to check if it is the
>>> > "commit offset to kafka" step consumes too much time.
>>> >
>>> > Best,
>>> > Congxian
>>> >
>>> >
>>> > Bekir Oguz  于2019年8月28日周三 下午4:19写道:
>>> >
>>> >> Hi Congxian,
>>> >> sorry for the late reply, but no progress on this issue yet. I checked
>>> >> also the kafka broker logs, found nothing interesting there.
>>> >> And we still have 15 min duration checkpoints quite often. Any more
>>> ideas
>>> >> on where to look at?
>>> >>
>>> >> Regards,
>>> >> Bekir
>>> >>
>>> >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
>>> >> wrote:
>>> >>
>>> >>> Hi Bekir
>>> >>>
>>> >>> Do you come back to work now, does there any more findings of this
>>> >>> problem?
>>> >>>
>>> >>> Best,
>>> >>> Congxian
>>> >>>
>>> >>>
>>> >>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
>>> >>>
>>>  Hi Congxian,
>>>  Thanks for following up this issue. It is still unresolved and I am
>>> on
>>>  vacation at the moment.  Hopefully my collegues Niels and Vlad can
>>> spare
>>>  some time to look into this.
>>> 
>>>  @Niels, @Vlad: do you guys also think that this issue might be Kafka
>>>  related? We could also check kafka broker logs at the time of long
>>>  checkpointing.
>>> 
>>>  Thanks,
>>>  Bekir
>>> 
>>>  Verstuurd vanaf mijn iPhone
>>> 
>>>  Op 12 aug. 2019 om 15:18 heeft Congxian Qiu >> >
>>>  het volgende geschreven:
>>> 
>>>  Hi Bekir
>>> 
>>>  Is there any progress about this problem?
>>> 
>>>  Best,
>>>  Congxian
>>> 
>>> 
>>>  Congxian Qiu  于2019年8月8日周四 下午11:17写道:
>>> 
>>> > hi Bekir
>>> > Thanks for the information.
>>> >
>>> > - Source's checkpoint was triggered by RPC calls, so it has the
>>> > "Trigger checkpoint xxx" log,
>>> > - other task's checkpoint was triggered after received all the
>>> barrier
>>> > of upstreams, it didn't log the "Trigger checkpoint XXX" :(
>>> >
>>> > Your diagnose makes sense to me, we need to check the Kafka log.
>>> > I also find out that we always have a log like
>>> > "org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>>> Marking
>>> > the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null)
>>> dead
>>> > for group userprofileaggregator
>>> > 2019-08-06 13:58:49,872 DEBUG
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask   -
>>> Notifica",
>>> >
>>> > I checked the doc of kafka[1], only find that the default of `
>>> > transaction.max.timeout.ms` is 15 min
>>> >
>>> > Please let me know there you have any finds. thanks
>>> >
>>> > PS: maybe you can also checkpoint the log for task
>>> > 

Re: instable checkpointing after migration to flink 1.8

2019-08-30 Thread Stephan Ewen
Hi all!

A thought would be that this has something to do with timers. Does the task
with that behavior use timers (windows, or process function)?

If that is the case, some theories to check:
  - Could it be a timer firing storm coinciding with a checkpoint?
Currently, that storm synchronously fires, checkpoints cannot preempt that,
which should change in 1.10 with the new mailbox model.
  - Could the timer-async checkpointing changes have something to do with
that? Does some of the usually small "preparation work" (happening
synchronously) lead to an unwanted effect?
  - Are you using TTL for state in that operator?
  - There were some changes made to support timers in RocksDB recently.
Could that have something to do with it?

Best,
Stephan


On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu  wrote:

> CC flink dev mail list
> Update for those who may be interested in this issue, we'are still
> diagnosing this problem currently.
>
> Best,
> Congxian
>
>
> Congxian Qiu  于2019年8月29日周四 下午8:58写道:
>
> > Hi Bekir
> >
> > Currently, from what we have diagnosed, there is some task complete its
> > checkpoint too late (maybe 15 mins), but we checked the kafka broker log
> > and did not find any interesting things there. could we run another job,
> > that did not commit offset to kafka, this wants to check if it is the
> > "commit offset to kafka" step consumes too much time.
> >
> > Best,
> > Congxian
> >
> >
> > Bekir Oguz  于2019年8月28日周三 下午4:19写道:
> >
> >> Hi Congxian,
> >> sorry for the late reply, but no progress on this issue yet. I checked
> >> also the kafka broker logs, found nothing interesting there.
> >> And we still have 15 min duration checkpoints quite often. Any more
> ideas
> >> on where to look at?
> >>
> >> Regards,
> >> Bekir
> >>
> >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
> >> wrote:
> >>
> >>> Hi Bekir
> >>>
> >>> Do you come back to work now, does there any more findings of this
> >>> problem?
> >>>
> >>> Best,
> >>> Congxian
> >>>
> >>>
> >>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
> >>>
>  Hi Congxian,
>  Thanks for following up this issue. It is still unresolved and I am on
>  vacation at the moment.  Hopefully my collegues Niels and Vlad can
> spare
>  some time to look into this.
> 
>  @Niels, @Vlad: do you guys also think that this issue might be Kafka
>  related? We could also check kafka broker logs at the time of long
>  checkpointing.
> 
>  Thanks,
>  Bekir
> 
>  Verstuurd vanaf mijn iPhone
> 
>  Op 12 aug. 2019 om 15:18 heeft Congxian Qiu 
>  het volgende geschreven:
> 
>  Hi Bekir
> 
>  Is there any progress about this problem?
> 
>  Best,
>  Congxian
> 
> 
>  Congxian Qiu  于2019年8月8日周四 下午11:17写道:
> 
> > hi Bekir
> > Thanks for the information.
> >
> > - Source's checkpoint was triggered by RPC calls, so it has the
> > "Trigger checkpoint xxx" log,
> > - other task's checkpoint was triggered after received all the
> barrier
> > of upstreams, it didn't log the "Trigger checkpoint XXX" :(
> >
> > Your diagnose makes sense to me, we need to check the Kafka log.
> > I also find out that we always have a log like
> > "org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> Marking
> > the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null) dead
> > for group userprofileaggregator
> > 2019-08-06 13:58:49,872 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask   -
> Notifica",
> >
> > I checked the doc of kafka[1], only find that the default of `
> > transaction.max.timeout.ms` is 15 min
> >
> > Please let me know there you have any finds. thanks
> >
> > PS: maybe you can also checkpoint the log for task
> > `d0aa98767c852c97ae8faf70a54241e3`, JM received its ack message late
> also.
> >
> > [1] https://kafka.apache.org/documentation/
> > Best,
> > Congxian
> >
> >
> > Bekir Oguz  于2019年8月7日周三 下午6:48写道:
> >
> >> Hi Congxian,
> >> Thanks for checking the logs. What I see from the logs is:
> >>
> >> - For the tasks like "Source:
> >> profileservice-snowplow-clean-events_kafka_source -> Filter” {17,
> 27, 31,
> >> 33, 34} / 70 : We have the ’Triggering checkpoint’ and also ‘Confirm
> >> checkpoint’ log lines, with 15 mins delay in between.
> >> - For the tasks like “KeyedProcess -> (Sink:
> >> profileservice-userprofiles_kafka_sink, Sink:
> >> profileservice-userprofiles_kafka_deletion_marker, Sink:
> >> profileservice-profiledeletion_kafka_sink” {1,2,3,4,5}/70 : We DO
> NOT have
> >> the “Triggering checkpoint” log, but only the ‘Confirm checkpoint’
> lines.
> >>
> >> And as a final point, we ALWAYS have Kafka AbstractCoordinator logs
> >> about lost connection to Kafka at the same time we have the
> checkpoints
> >> confirmed. This 15 minutes delay might be because 

Re: instable checkpointing after migration to flink 1.8

2019-08-30 Thread Congxian Qiu
CC flink dev mail list
Update for those who may be interested in this issue, we'are still
diagnosing this problem currently.

Best,
Congxian


Congxian Qiu  于2019年8月29日周四 下午8:58写道:

> Hi Bekir
>
> Currently, from what we have diagnosed, there is some task complete its
> checkpoint too late (maybe 15 mins), but we checked the kafka broker log
> and did not find any interesting things there. could we run another job,
> that did not commit offset to kafka, this wants to check if it is the
> "commit offset to kafka" step consumes too much time.
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年8月28日周三 下午4:19写道:
>
>> Hi Congxian,
>> sorry for the late reply, but no progress on this issue yet. I checked
>> also the kafka broker logs, found nothing interesting there.
>> And we still have 15 min duration checkpoints quite often. Any more ideas
>> on where to look at?
>>
>> Regards,
>> Bekir
>>
>> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
>> wrote:
>>
>>> Hi Bekir
>>>
>>> Do you come back to work now, does there any more findings of this
>>> problem?
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
>>>
 Hi Congxian,
 Thanks for following up this issue. It is still unresolved and I am on
 vacation at the moment.  Hopefully my collegues Niels and Vlad can spare
 some time to look into this.

 @Niels, @Vlad: do you guys also think that this issue might be Kafka
 related? We could also check kafka broker logs at the time of long
 checkpointing.

 Thanks,
 Bekir

 Verstuurd vanaf mijn iPhone

 Op 12 aug. 2019 om 15:18 heeft Congxian Qiu 
 het volgende geschreven:

 Hi Bekir

 Is there any progress about this problem?

 Best,
 Congxian


 Congxian Qiu  于2019年8月8日周四 下午11:17写道:

> hi Bekir
> Thanks for the information.
>
> - Source's checkpoint was triggered by RPC calls, so it has the
> "Trigger checkpoint xxx" log,
> - other task's checkpoint was triggered after received all the barrier
> of upstreams, it didn't log the "Trigger checkpoint XXX" :(
>
> Your diagnose makes sense to me, we need to check the Kafka log.
> I also find out that we always have a log like
> "org.apache.kafka.clients.consumer.internals.AbstractCoordinator  Marking
> the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null) dead
> for group userprofileaggregator
> 2019-08-06 13:58:49,872 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Notifica",
>
> I checked the doc of kafka[1], only find that the default of `
> transaction.max.timeout.ms` is 15 min
>
> Please let me know there you have any finds. thanks
>
> PS: maybe you can also checkpoint the log for task
> `d0aa98767c852c97ae8faf70a54241e3`, JM received its ack message late also.
>
> [1] https://kafka.apache.org/documentation/
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年8月7日周三 下午6:48写道:
>
>> Hi Congxian,
>> Thanks for checking the logs. What I see from the logs is:
>>
>> - For the tasks like "Source:
>> profileservice-snowplow-clean-events_kafka_source -> Filter” {17, 27, 31,
>> 33, 34} / 70 : We have the ’Triggering checkpoint’ and also ‘Confirm
>> checkpoint’ log lines, with 15 mins delay in between.
>> - For the tasks like “KeyedProcess -> (Sink:
>> profileservice-userprofiles_kafka_sink, Sink:
>> profileservice-userprofiles_kafka_deletion_marker, Sink:
>> profileservice-profiledeletion_kafka_sink” {1,2,3,4,5}/70 : We DO NOT 
>> have
>> the “Triggering checkpoint” log, but only the ‘Confirm checkpoint’ lines.
>>
>> And as a final point, we ALWAYS have Kafka AbstractCoordinator logs
>> about lost connection to Kafka at the same time we have the checkpoints
>> confirmed. This 15 minutes delay might be because of some timeout at the
>> Kafka client (maybe 15 mins timeout), and then marking kafka  coordinator
>> dead, and then discovering kafka coordinator again.
>>
>> If the kafka connection is IDLE during 15 mins, Flink cannot confirm
>> the checkpoints, cannot send the async offset commit request to Kafka. 
>> This
>> could be the root cause of the problem. Please see the attached logs
>> filtered on the Kafka AbstractCoordinator. Every time we have a 15 
>> minutes
>> checkpoint, we have this kafka issue. (Happened today at 9:14 and 9:52)
>>
>>
>> I will enable Kafka DEBUG logging to see more and let you know about
>> the findings.
>>
>> Thanks a lot for your support,
>> Bekir Oguz
>>
>>
>>
>> Op 7 aug. 2019, om 12:06 heeft Congxian Qiu 
>> het volgende geschreven:
>>
>> Hi
>>
>> Received all the files, as a first glance, the program uses at least
>> once checkpoint mode, from the tm log, maybe we need to check checkpoint 
>> of
>> 

Re: instable checkpointing after migration to flink 1.8

2019-08-02 Thread Congxian Qiu
Hi Bekir

Cloud you please also share the below information:
- jobmanager.log
- taskmanager.log(with debug info enabled) for the problematic subtask.
- the DAG of your program (if can provide the skeleton program is better --
can send to me privately)

For the subIndex, maybe we can use the deploy log message in jobmanager log
to identify which subtask we want. For example in JM log, we'll have
something like "2019-08-02 11:38:47,291 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source:
Custom Source (2/2) (attempt #0) to
container_e62_1551952890130_2071_01_02 @ aa.bb.cc.dd.ee
(dataPort=39488)" then we know "Custum Source (2/2)" was deplyed to "
aa.bb.cc.dd.ee" with port 39488. Sadly, there maybe still more than one
subtasks in one contain :(

Best,
Congxian


Bekir Oguz  于2019年8月2日周五 下午4:22写道:

> Forgot to add the checkpoint details after it was complete. This is for
> that long running checkpoint with id 95632.
>
>
>
> Op 2 aug. 2019, om 11:18 heeft Bekir Oguz  het
> volgende geschreven:
>
> Hi Congxian,
> I was able to fetch the logs of the task manager (attached) and the
> screenshots of the latest long checkpoint. I will get the logs of the job
> manager for the next long running checkpoint. And also I will try to get a
> jstack during the long running checkpoint.
>
> Note: Since at the Subtasks tab we do not have the subtask numbers, and at
> the Details tab of the checkpoint, we have the subtask numbers but not the
> task manager hosts, it is difficult to match those. We’re assuming they
> have the same order, so seeing that 3rd subtask is failing, I am getting
> the 3rd line at the Subtasks tab which leads to the task manager
> host flink-taskmanager-84ccd5bddf-2cbxn. ***It would be a great feature if
> you guys also include the subtask-id’s to the Subtasks view.***
>
> Note: timestamps in the task manager log are in UTC and I am at the moment
> at zone UTC+3, so the time 10:30 at the screenshot matches the time 7:30 in
> the log.
>
>
> Kind regards,
> Bekir
>
> 
>
> 
> 
> 
>
>
>
> Op 2 aug. 2019, om 07:23 heeft Congxian Qiu  het
> volgende geschreven:
>
> Hi Bekir
> I’ll first summary the problem here(please correct me if I’m wrong)
> 1. The same program runs on 1.6 never encounter such problems
> 2. Some checkpoints completed too long (15+ min), but other normal
> checkpoints complete less than 1 min
> 3. Some  bad checkpoint will have a large sync time, async time seems ok
> 4. Some bad checkpoint, the e2e duration will much bigger than (sync_time
> + async_time)
> First, answer the last question, the e2e duration is ack_time -
> trigger_time, so it always bigger than (sync_time + async_time), but we
> have a big gap here, this may be problematic.
> According to all the information, maybe the problem is some task start to
> do checkpoint too late and the sync checkpoint part took some time too
> long, Could you please share some more information such below:
> - A Screenshot of summary for one bad checkpoint(we call it A here)
> - The detailed information of checkpoint A(includes all the problematic
> subtasks)
> - Jobmanager.log and the taskmanager.log for the problematic task and a
> health task
> - Share the screenshot of subtasks for the problematic task(includes the
> `Bytes received`, `Records received`, `Bytes sent`, `Records sent` column),
> here wants to compare the problematic parallelism and good parallelism’s
> information, please also share the information is there has a data skew
> among the parallelisms,
> - could you please share some jstacks of the problematic parallelism —
> here wants to check whether the task is too busy to handle the barrier.
> (flame graph or other things is always welcome here)
>
> Best,
> Congxian
>
>
> Congxian Qiu  于2019年8月1日周四 下午8:26写道:
>
>> Hi Bekir
>>
>> I'll first comb through all the information here, and try to find out the
>> reason with you, maybe need you to share some more information :)
>>
>> Best,
>> Congxian
>>
>>
>> Bekir Oguz  于2019年8月1日周四 下午5:00写道:
>>
>>> Hi Fabian,
>>> Thanks for sharing this with us, but we’re already on version 1.8.1.
>>>
>>> What I don’t understand is which mechanism in Flink adds 15 minutes to
>>> the checkpoint duration occasionally. Can you maybe give us some hints on
>>> where to look at? Is there a default timeout of 15 minutes defined
>>> somewhere in Flink? I couldn’t find one.
>>>
>>> In our pipeline, most of the checkpoints complete in less than a minute
>>> and some of them completed in 15 minutes+(less than a minute).
>>> There’s definitely something which adds 15 minutes. This is happening in
>>> one or more subtasks during checkpointing.
>>>
>>> Please see the screenshot below:
>>>
>>> Regards,
>>> Bekir
>>>
>>>
>>>
>>> Op 23 jul. 2019, om 16:37 heeft Fabian Hueske  het
>>> volgende geschreven:
>>>
>>> Hi Bekir,
>>>
>>> Another user reported checkpointing issues with Flink 1.8.0 [1].
>>> These seem to be resolved with Flink 1.8.1.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 

Re: instable checkpointing after migration to flink 1.8

2019-08-02 Thread Bekir Oguz
Forgot to add the checkpoint details after it was complete. This is for that 
long running checkpoint with id 95632.



> Op 2 aug. 2019, om 11:18 heeft Bekir Oguz  het 
> volgende geschreven:
> 
> Hi Congxian,
> I was able to fetch the logs of the task manager (attached) and the 
> screenshots of the latest long checkpoint. I will get the logs of the job 
> manager for the next long running checkpoint. And also I will try to get a 
> jstack during the long running checkpoint.
> 
> Note: Since at the Subtasks tab we do not have the subtask numbers, and at 
> the Details tab of the checkpoint, we have the subtask numbers but not the 
> task manager hosts, it is difficult to match those. We’re assuming they have 
> the same order, so seeing that 3rd subtask is failing, I am getting the 3rd 
> line at the Subtasks tab which leads to the task manager host 
> flink-taskmanager-84ccd5bddf-2cbxn. ***It would be a great feature if you 
> guys also include the subtask-id’s to the Subtasks view.*** 
> 
> Note: timestamps in the task manager log are in UTC and I am at the moment at 
> zone UTC+3, so the time 10:30 at the screenshot matches the time 7:30 in the 
> log.
> 
> 
> Kind regards,
> Bekir
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> Op 2 aug. 2019, om 07:23 heeft Congxian Qiu > > het volgende geschreven:
>> 
>> Hi Bekir
>> I’ll first summary the problem here(please correct me if I’m wrong)
>> 1. The same program runs on 1.6 never encounter such problems
>> 2. Some checkpoints completed too long (15+ min), but other normal 
>> checkpoints complete less than 1 min
>> 3. Some  bad checkpoint will have a large sync time, async time seems ok
>> 4. Some bad checkpoint, the e2e duration will much bigger than (sync_time + 
>> async_time) 
>> First, answer the last question, the e2e duration is ack_time - 
>> trigger_time, so it always bigger than (sync_time + async_time), but we have 
>> a big gap here, this may be problematic.
>> According to all the information, maybe the problem is some task start to do 
>> checkpoint too late and the sync checkpoint part took some time too long, 
>> Could you please share some more information such below:
>> - A Screenshot of summary for one bad checkpoint(we call it A here)
>> - The detailed information of checkpoint A(includes all the problematic 
>> subtasks)
>> - Jobmanager.log and the taskmanager.log for the problematic task and a 
>> health task
>> - Share the screenshot of subtasks for the problematic task(includes the 
>> `Bytes received`, `Records received`, `Bytes sent`, `Records sent` column), 
>> here wants to compare the problematic parallelism and good parallelism’s 
>> information, please also share the information is there has a data skew 
>> among the parallelisms,
>> - could you please share some jstacks of the problematic parallelism — here 
>> wants to check whether the task is too busy to handle the barrier. (flame 
>> graph or other things is always welcome here)
>> 
>> Best,
>> Congxian
>> 
>> 
>> Congxian Qiu mailto:qcx978132...@gmail.com>> 
>> 于2019年8月1日周四 下午8:26写道:
>> Hi Bekir
>> 
>> I'll first comb through all the information here, and try to find out the 
>> reason with you, maybe need you to share some more information :)
>> 
>> Best,
>> Congxian
>> 
>> 
>> Bekir Oguz mailto:bekir.o...@persgroep.net>> 
>> 于2019年8月1日周四 下午5:00写道:
>> Hi Fabian,
>> Thanks for sharing this with us, but we’re already on version 1.8.1.
>> 
>> What I don’t understand is which mechanism in Flink adds 15 minutes to the 
>> checkpoint duration occasionally. Can you maybe give us some hints on where 
>> to look at? Is there a default timeout of 15 minutes defined somewhere in 
>> Flink? I couldn’t find one.
>> 
>> In our pipeline, most of the checkpoints complete in less than a minute and 
>> some of them completed in 15 minutes+(less than a minute).
>> There’s definitely something which adds 15 minutes. This is happening in one 
>> or more subtasks during checkpointing.
>> 
>> Please see the screenshot below:
>> 
>> 
>> Regards,
>> Bekir
>> 
>> 
>> 
>>> Op 23 jul. 2019, om 16:37 heeft Fabian Hueske >> > het volgende geschreven:
>>> 
>>> Hi Bekir,
>>> 
>>> Another user reported checkpointing issues with Flink 1.8.0 [1].
>>> These seem to be resolved with Flink 1.8.1.
>>> 
>>> Hope this helps,
>>> Fabian
>>> 
>>> [1]
>>> https://lists.apache.org/thread.html/991fe3b09fd6a052ff52e5f7d9cdd9418545e68b02e23493097d9bc4@%3Cuser.flink.apache.org%3E
>>>  
>>> 
>>> 
>>> Am Mi., 17. Juli 2019 um 09:16 Uhr schrieb Congxian Qiu <
>>> qcx978132...@gmail.com >:
>>> 
 Hi Bekir
 
 First of all, I think there is something wrong.  the state size is almost
 the same,  but the duration is different so much.
 
 The checkpoint for RocksDBStatebackend is dump sst files, then copy the

Re: instable checkpointing after migration to flink 1.8

2019-08-01 Thread Congxian Qiu
cc Bekir

Best,
Congxian


Congxian Qiu  于2019年8月2日周五 下午12:23写道:

> Hi Bekir
> I’ll first summary the problem here(please correct me if I’m wrong)
> 1. The same program runs on 1.6 never encounter such problems
> 2. Some checkpoints completed too long (15+ min), but other normal
> checkpoints complete less than 1 min
> 3. Some  bad checkpoint will have a large sync time, async time seems ok
> 4. Some bad checkpoint, the e2e duration will much bigger than (sync_time
> + async_time)
> First, answer the last question, the e2e duration is ack_time -
> trigger_time, so it always bigger than (sync_time + async_time), but we
> have a big gap here, this may be problematic.
> According to all the information, maybe the problem is some task start to
> do checkpoint too late and the sync checkpoint part took some time too
> long, Could you please share some more information such below:
> - A Screenshot of summary for one bad checkpoint(we call it A here)
> - The detailed information of checkpoint A(includes all the problematic
> subtasks)
> - Jobmanager.log and the taskmanager.log for the problematic task and a
> health task
> - Share the screenshot of subtasks for the problematic task(includes the
> `Bytes received`, `Records received`, `Bytes sent`, `Records sent` column),
> here wants to compare the problematic parallelism and good parallelism’s
> information, please also share the information is there has a data skew
> among the parallelisms,
> - could you please share some jstacks of the problematic parallelism —
> here wants to check whether the task is too busy to handle the barrier.
> (flame graph or other things is always welcome here)
>
> Best,
> Congxian
>
>
> Congxian Qiu  于2019年8月1日周四 下午8:26写道:
>
>> Hi Bekir
>>
>> I'll first comb through all the information here, and try to find out the
>> reason with you, maybe need you to share some more information :)
>>
>> Best,
>> Congxian
>>
>>
>> Bekir Oguz  于2019年8月1日周四 下午5:00写道:
>>
>>> Hi Fabian,
>>> Thanks for sharing this with us, but we’re already on version 1.8.1.
>>>
>>> What I don’t understand is which mechanism in Flink adds 15 minutes to
>>> the checkpoint duration occasionally. Can you maybe give us some hints on
>>> where to look at? Is there a default timeout of 15 minutes defined
>>> somewhere in Flink? I couldn’t find one.
>>>
>>> In our pipeline, most of the checkpoints complete in less than a minute
>>> and some of them completed in 15 minutes+(less than a minute).
>>> There’s definitely something which adds 15 minutes. This is happening in
>>> one or more subtasks during checkpointing.
>>>
>>> Please see the screenshot below:
>>>
>>> Regards,
>>> Bekir
>>>
>>>
>>>
>>> Op 23 jul. 2019, om 16:37 heeft Fabian Hueske  het
>>> volgende geschreven:
>>>
>>> Hi Bekir,
>>>
>>> Another user reported checkpointing issues with Flink 1.8.0 [1].
>>> These seem to be resolved with Flink 1.8.1.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> [1]
>>>
>>> https://lists.apache.org/thread.html/991fe3b09fd6a052ff52e5f7d9cdd9418545e68b02e23493097d9bc4@%3Cuser.flink.apache.org%3E
>>>
>>> Am Mi., 17. Juli 2019 um 09:16 Uhr schrieb Congxian Qiu <
>>> qcx978132...@gmail.com>:
>>>
>>> Hi Bekir
>>>
>>> First of all, I think there is something wrong.  the state size is almost
>>> the same,  but the duration is different so much.
>>>
>>> The checkpoint for RocksDBStatebackend is dump sst files, then copy the
>>> needed sst files(if you enable incremental checkpoint, the sst files
>>> already on remote will not upload), then complete checkpoint. Can you
>>> check
>>> the network bandwidth usage during checkpoint?
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Bekir Oguz  于2019年7月16日周二 下午10:45写道:
>>>
>>> Hi all,
>>> We have a flink job with user state, checkpointing to RocksDBBackend
>>> which is externally stored in AWS S3.
>>> After we have migrated our cluster from 1.6 to 1.8, we see occasionally
>>> that some slots do to acknowledge the checkpoints quick enough. As an
>>> example: All slots acknowledge between 30-50 seconds except only one slot
>>> acknowledges in 15 mins. Checkpoint sizes are similar to each other, like
>>> 200-400 MB.
>>>
>>> We did not experience this weird behaviour in Flink 1.6. We have 5 min
>>> checkpoint interval and this happens sometimes once in an hour sometimes
>>> more but not in all the checkpoint requests. Please see the screenshot
>>> below.
>>>
>>> Also another point: For the faulty slots, the duration is consistently 15
>>> mins and some seconds, we couldn’t find out where this 15 mins response
>>> time comes from. And each time it is a different task manager, not always
>>> the same one.
>>>
>>> Do you guys aware of any other users having similar issues with the new
>>> version and also a suggested bug fix or solution?
>>>
>>>
>>>
>>>
>>> Thanks in advance,
>>> Bekir Oguz
>>>
>>>
>>>
>>>


Re: instable checkpointing after migration to flink 1.8

2019-08-01 Thread Congxian Qiu
Hi Bekir
I’ll first summary the problem here(please correct me if I’m wrong)
1. The same program runs on 1.6 never encounter such problems
2. Some checkpoints completed too long (15+ min), but other normal
checkpoints complete less than 1 min
3. Some  bad checkpoint will have a large sync time, async time seems ok
4. Some bad checkpoint, the e2e duration will much bigger than (sync_time +
async_time)
First, answer the last question, the e2e duration is ack_time -
trigger_time, so it always bigger than (sync_time + async_time), but we
have a big gap here, this may be problematic.
According to all the information, maybe the problem is some task start to
do checkpoint too late and the sync checkpoint part took some time too
long, Could you please share some more information such below:
- A Screenshot of summary for one bad checkpoint(we call it A here)
- The detailed information of checkpoint A(includes all the problematic
subtasks)
- Jobmanager.log and the taskmanager.log for the problematic task and a
health task
- Share the screenshot of subtasks for the problematic task(includes the
`Bytes received`, `Records received`, `Bytes sent`, `Records sent` column),
here wants to compare the problematic parallelism and good parallelism’s
information, please also share the information is there has a data skew
among the parallelisms,
- could you please share some jstacks of the problematic parallelism — here
wants to check whether the task is too busy to handle the barrier. (flame
graph or other things is always welcome here)

Best,
Congxian


Congxian Qiu  于2019年8月1日周四 下午8:26写道:

> Hi Bekir
>
> I'll first comb through all the information here, and try to find out the
> reason with you, maybe need you to share some more information :)
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年8月1日周四 下午5:00写道:
>
>> Hi Fabian,
>> Thanks for sharing this with us, but we’re already on version 1.8.1.
>>
>> What I don’t understand is which mechanism in Flink adds 15 minutes to
>> the checkpoint duration occasionally. Can you maybe give us some hints on
>> where to look at? Is there a default timeout of 15 minutes defined
>> somewhere in Flink? I couldn’t find one.
>>
>> In our pipeline, most of the checkpoints complete in less than a minute
>> and some of them completed in 15 minutes+(less than a minute).
>> There’s definitely something which adds 15 minutes. This is happening in
>> one or more subtasks during checkpointing.
>>
>> Please see the screenshot below:
>>
>> Regards,
>> Bekir
>>
>>
>>
>> Op 23 jul. 2019, om 16:37 heeft Fabian Hueske  het
>> volgende geschreven:
>>
>> Hi Bekir,
>>
>> Another user reported checkpointing issues with Flink 1.8.0 [1].
>> These seem to be resolved with Flink 1.8.1.
>>
>> Hope this helps,
>> Fabian
>>
>> [1]
>>
>> https://lists.apache.org/thread.html/991fe3b09fd6a052ff52e5f7d9cdd9418545e68b02e23493097d9bc4@%3Cuser.flink.apache.org%3E
>>
>> Am Mi., 17. Juli 2019 um 09:16 Uhr schrieb Congxian Qiu <
>> qcx978132...@gmail.com>:
>>
>> Hi Bekir
>>
>> First of all, I think there is something wrong.  the state size is almost
>> the same,  but the duration is different so much.
>>
>> The checkpoint for RocksDBStatebackend is dump sst files, then copy the
>> needed sst files(if you enable incremental checkpoint, the sst files
>> already on remote will not upload), then complete checkpoint. Can you
>> check
>> the network bandwidth usage during checkpoint?
>>
>> Best,
>> Congxian
>>
>>
>> Bekir Oguz  于2019年7月16日周二 下午10:45写道:
>>
>> Hi all,
>> We have a flink job with user state, checkpointing to RocksDBBackend
>> which is externally stored in AWS S3.
>> After we have migrated our cluster from 1.6 to 1.8, we see occasionally
>> that some slots do to acknowledge the checkpoints quick enough. As an
>> example: All slots acknowledge between 30-50 seconds except only one slot
>> acknowledges in 15 mins. Checkpoint sizes are similar to each other, like
>> 200-400 MB.
>>
>> We did not experience this weird behaviour in Flink 1.6. We have 5 min
>> checkpoint interval and this happens sometimes once in an hour sometimes
>> more but not in all the checkpoint requests. Please see the screenshot
>> below.
>>
>> Also another point: For the faulty slots, the duration is consistently 15
>> mins and some seconds, we couldn’t find out where this 15 mins response
>> time comes from. And each time it is a different task manager, not always
>> the same one.
>>
>> Do you guys aware of any other users having similar issues with the new
>> version and also a suggested bug fix or solution?
>>
>>
>>
>>
>> Thanks in advance,
>> Bekir Oguz
>>
>>
>>
>>


Re: instable checkpointing after migration to flink 1.8

2019-08-01 Thread Congxian Qiu
Hi Bekir

I'll first comb through all the information here, and try to find out the
reason with you, maybe need you to share some more information :)

Best,
Congxian


Bekir Oguz  于2019年8月1日周四 下午5:00写道:

> Hi Fabian,
> Thanks for sharing this with us, but we’re already on version 1.8.1.
>
> What I don’t understand is which mechanism in Flink adds 15 minutes to the
> checkpoint duration occasionally. Can you maybe give us some hints on where
> to look at? Is there a default timeout of 15 minutes defined somewhere in
> Flink? I couldn’t find one.
>
> In our pipeline, most of the checkpoints complete in less than a minute
> and some of them completed in 15 minutes+(less than a minute).
> There’s definitely something which adds 15 minutes. This is happening in
> one or more subtasks during checkpointing.
>
> Please see the screenshot below:
>
> Regards,
> Bekir
>
>
>
> Op 23 jul. 2019, om 16:37 heeft Fabian Hueske  het
> volgende geschreven:
>
> Hi Bekir,
>
> Another user reported checkpointing issues with Flink 1.8.0 [1].
> These seem to be resolved with Flink 1.8.1.
>
> Hope this helps,
> Fabian
>
> [1]
>
> https://lists.apache.org/thread.html/991fe3b09fd6a052ff52e5f7d9cdd9418545e68b02e23493097d9bc4@%3Cuser.flink.apache.org%3E
>
> Am Mi., 17. Juli 2019 um 09:16 Uhr schrieb Congxian Qiu <
> qcx978132...@gmail.com>:
>
> Hi Bekir
>
> First of all, I think there is something wrong.  the state size is almost
> the same,  but the duration is different so much.
>
> The checkpoint for RocksDBStatebackend is dump sst files, then copy the
> needed sst files(if you enable incremental checkpoint, the sst files
> already on remote will not upload), then complete checkpoint. Can you check
> the network bandwidth usage during checkpoint?
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年7月16日周二 下午10:45写道:
>
> Hi all,
> We have a flink job with user state, checkpointing to RocksDBBackend
> which is externally stored in AWS S3.
> After we have migrated our cluster from 1.6 to 1.8, we see occasionally
> that some slots do to acknowledge the checkpoints quick enough. As an
> example: All slots acknowledge between 30-50 seconds except only one slot
> acknowledges in 15 mins. Checkpoint sizes are similar to each other, like
> 200-400 MB.
>
> We did not experience this weird behaviour in Flink 1.6. We have 5 min
> checkpoint interval and this happens sometimes once in an hour sometimes
> more but not in all the checkpoint requests. Please see the screenshot
> below.
>
> Also another point: For the faulty slots, the duration is consistently 15
> mins and some seconds, we couldn’t find out where this 15 mins response
> time comes from. And each time it is a different task manager, not always
> the same one.
>
> Do you guys aware of any other users having similar issues with the new
> version and also a suggested bug fix or solution?
>
>
>
>
> Thanks in advance,
> Bekir Oguz
>
>
>
>


Re: instable checkpointing after migration to flink 1.8

2019-07-23 Thread Fabian Hueske
Hi Bekir,

Another user reported checkpointing issues with Flink 1.8.0 [1].
These seem to be resolved with Flink 1.8.1.

Hope this helps,
Fabian

[1]
https://lists.apache.org/thread.html/991fe3b09fd6a052ff52e5f7d9cdd9418545e68b02e23493097d9bc4@%3Cuser.flink.apache.org%3E

Am Mi., 17. Juli 2019 um 09:16 Uhr schrieb Congxian Qiu <
qcx978132...@gmail.com>:

> Hi Bekir
>
> First of all, I think there is something wrong.  the state size is almost
> the same,  but the duration is different so much.
>
> The checkpoint for RocksDBStatebackend is dump sst files, then copy the
> needed sst files(if you enable incremental checkpoint, the sst files
> already on remote will not upload), then complete checkpoint. Can you check
> the network bandwidth usage during checkpoint?
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年7月16日周二 下午10:45写道:
>
>> Hi all,
>> We have a flink job with user state, checkpointing to RocksDBBackend
>> which is externally stored in AWS S3.
>> After we have migrated our cluster from 1.6 to 1.8, we see occasionally
>> that some slots do to acknowledge the checkpoints quick enough. As an
>> example: All slots acknowledge between 30-50 seconds except only one slot
>> acknowledges in 15 mins. Checkpoint sizes are similar to each other, like
>> 200-400 MB.
>>
>> We did not experience this weird behaviour in Flink 1.6. We have 5 min
>> checkpoint interval and this happens sometimes once in an hour sometimes
>> more but not in all the checkpoint requests. Please see the screenshot
>> below.
>>
>> Also another point: For the faulty slots, the duration is consistently 15
>> mins and some seconds, we couldn’t find out where this 15 mins response
>> time comes from. And each time it is a different task manager, not always
>> the same one.
>>
>> Do you guys aware of any other users having similar issues with the new
>> version and also a suggested bug fix or solution?
>>
>>
>>
>>
>> Thanks in advance,
>> Bekir Oguz
>>
>


Re: instable checkpointing after migration to flink 1.8 (production issue)

2019-07-18 Thread Congxian Qiu
Hi, Bekir

First, The e2e time for a sub task is the $ack_time_received_in_JM -
$trigger_time_in_JM. And checkpoint includes some steps on task side such
as 1) receive first barrier; 2) barrier align(for exactly once); 3)
operator snapshot sync part; 4) operator snapshot async part, the images
you shared yesterday show that the sync part took a too long time, now the
sync part and async part took some time long, and e2e time is much longer
than sync_time + async_time.
1. you can checkpoint whether your job has backpressure
problems(backpressure may lead the barrier flows too slowly to the downside
task.), if it has such a problem, you should better solve it first.
2. If do not have a backpressure problem, you can check the `Alignment
Duration` to see if the barriers align took a too long time.
3. for sync part, maybe you can checkpoint the disk performance(if there
did not have the metric, you can find the `sar` log in your machine)
4. for the async part, we can check the network performance(or some client
network flow control)

Hope this can help you.

Best,
Congxian


Bekir Oguz  于2019年7月18日周四 下午6:05写道:

> Hi Congxian,
> Starting from this morning we have more issues with checkpointing in
> production. What we see is sync and async duration for some subtasks are
> very long but what strange is the total of sync and async durations are
> much less than the total end to end duration. Please check the following
> snapshot:
>
>
> For example, for the subtask 14: Sync duration is 4 mins, async duration 3
> mins, end-to-end duration is 53 mins!!!
> We have a very long timeout value (1 hour) for checkpointing, but still
> many checkpoints are failing, some subtasks cannot finish checkpointing in
> 1 hour.
>
> We really appreciate your help here, this is a critical production problem
> for us at the moment.
>
> Regards,
> Bekir
>
>
> On 17 Jul 2019, at 17:46, Bekir Oguz  wrote:
>
>
> And I also extracted events fr
>
>
>


Re: instable checkpointing after migration to flink 1.8 (production issue)

2019-07-18 Thread Bekir Oguz
Hi Congxian,
Starting from this morning we have more issues with checkpointing in 
production. What we see is sync and async duration for some subtasks are very 
long but what strange is the total of sync and async durations are much less 
than the total end to end duration. Please check the following snapshot:



For example, for the subtask 14: Sync duration is 4 mins, async duration 3 
mins, end-to-end duration is 53 mins!!!
We have a very long timeout value (1 hour) for checkpointing, but still many 
checkpoints are failing, some subtasks cannot finish checkpointing in 1 hour.

We really appreciate your help here, this is a critical production problem for 
us at the moment.

Regards,
Bekir


> On 17 Jul 2019, at 17:46, Bekir Oguz  wrote:
> 
> 
> And I also extracted events fr



Re: instable checkpointing after migration to flink 1.8

2019-07-17 Thread Bekir Oguz
Hi Congxian,Yes we have incremental checkpointing enabled on RocksDBBackend.For further investigation, I have logged into one task manager node which had 15 min long snapshotting and found the logs under some /tmp directory.Attaching 2 logs files, one for a long/problematic snapshotting and one log file for a good/fast snapshot.I know the problematic snapshot from the trigger time (14:22 today) on the task manager, that’s only a guess and I extracted the log events around that time in the bad_snapshot.log file.And I also extracted events from 14:11 today from the same task manager in the good_snapshot.log file to be able to compare these two.The only difference I can see is the compaction kicking in during the checkpointing in the bad_snaphot.log file.Do these logs give more insight to explain what is going on?

bad_snapshot.log
Description: Binary data


good_snapshot.log
Description: Binary data
Kind regards,Bekir OguzOn 17 Jul 2019, at 15:29, Congxian Qiu  wrote:Hi BekirSorry for the previous message, I didn't see the second image of your first message :(From the second image of your first message, seems the sync part consumes too much time.57    15:40:24(acknowledgement Time)   15m53s (End to End Duration)  464m(State size)  15M48s(Checkpoint Duration(Sync))  4s(Checkpoint Duration (Async)Do you enable incremental checkpoint or not?If you enable incremental checkpoint, then In the sync part of a checkpoint for a RocksDBStateBackend, we'll 1) flush all data from memory to sst files, 2) snapshot meta, 3) checkpoint the RocksDB, maybe we should check the disk info during the long checkpoint. If you disable incremental checkpoint, then in the sync part of a checkpoint for RocksDBStateBackend, we'll 1) snapshot meta; 2) get a snapshot from RocksDBAnd another question for this is, do you ever change the user jar's logic when migrating from 1.6 to 1.8?Best,CongxianBekir Oguz  于2019年7月17日周三 下午5:15写道:Sending again with reduced image sizes due to Apache mail server error.Begin forwarded message:From: Bekir Oguz Subject: Re: instable checkpointing after migration to flink 1.8Date: 17 July 2019 at 11:10:41 CESTTo: Congxian Qiu Cc: dev@flink.apache.orgHi Congxian,Thanks for your response. Here are the memory/cpu/network usage of the task manager and the job manager pods around that time.The vertical line is the moment the checkpoint is triggered (15:24) and acknowledgement received on 15:40. What we see is the memory usage is jumping around +1GB each time a checkpoint is triggered. We can also see the network bandwidth usage correlates with the checkpointing interval of 5 mins. After the checkpoint is triggered on 15:24 we see a normal network bandwidth usage for 5 mins and then nothing for about 15 mins which is the checkpoint ack time for this task slot. Regards,BekirOn 17 Jul 2019, at 09:16, Congxian Qiu  wrote:Hi BekirFirst of all, I think there is something wrong.  the state size is almost the same,  but the duration is different so much.The checkpoint for RocksDBStatebackend is dump sst files, then copy the needed sst files(if you enable incremental checkpoint, the sst files already on remote will not upload), then complete checkpoint. Can you check the network bandwidth usage during checkpoint?Best,CongxianBekir Oguz  于2019年7月16日周二 下午10:45写道:Hi all,We have a flink job with user state, checkpointing to RocksDBBackend which is externally stored in AWS S3.After we have migrated our cluster from 1.6 to 1.8, we see occasionally that some slots do to acknowledge the checkpoints quick enough. As an example: All slots acknowledge between 30-50 seconds except only one slot acknowledges in 15 mins. Checkpoint sizes are similar to each other, like 200-400 MB.We did not experience this weird behaviour in Flink 1.6. We have 5 min checkpoint interval and this happens sometimes once in an hour sometimes more but not in all the checkpoint requests. Please see the screenshot below.Also another point: For the faulty slots, the duration is consistently 15 mins and some seconds, we couldn’t find out where this 15 mins response time comes from. And each time it is a different task manager, not always the same one.Do you guys aware of any other users having similar issues with the new version and also a suggested bug fix or solution?Thanks in advance,Bekir Oguz



Re: instable checkpointing after migration to flink 1.8

2019-07-17 Thread Congxian Qiu
Hi Bekir

Sorry for the previous message, I didn't see the second image of your first
message :(

>From the second image of your first message, seems the sync part consumes
too much time.
5715:40:24(acknowledgement Time)   15m53s (End to End Duration)
464m(State size)  15M48s(Checkpoint Duration(Sync))  4s(Checkpoint Duration
(Async)

Do you enable incremental checkpoint or not?
If you enable incremental checkpoint, then In the sync part of a checkpoint
for a RocksDBStateBackend, we'll 1) flush all data from memory to sst
files, 2) snapshot meta, 3) checkpoint the RocksDB, maybe we should check
the disk info during the long checkpoint.

If you disable incremental checkpoint, then in the sync part of a
checkpoint for RocksDBStateBackend, we'll 1) snapshot meta; 2) get a
snapshot from RocksDB

And another question for this is, do you ever change the user jar's logic
when migrating from 1.6 to 1.8?

Best,
Congxian


Bekir Oguz  于2019年7月17日周三 下午5:15写道:

> Sending again with reduced image sizes due to Apache mail server error.
>
> Begin forwarded message:
>
> *From: *Bekir Oguz 
> *Subject: **Re: instable checkpointing after migration to flink 1.8*
> *Date: *17 July 2019 at 11:10:41 CEST
> *To: *Congxian Qiu 
> *Cc: *dev@flink.apache.org
>
> Hi Congxian,
> Thanks for your response. Here are the memory/cpu/network usage of the
> task manager and the job manager pods around that time.
> The vertical line is the moment the checkpoint is triggered (15:24) and
> acknowledgement received on 15:40.
>
> What we see is the memory usage is jumping around +1GB each time a
> checkpoint is triggered. We can also see the network bandwidth usage
> correlates with the checkpointing interval of 5 mins. After the checkpoint
> is triggered on 15:24 we see a normal network bandwidth usage for 5 mins
> and then nothing for about 15 mins which is the checkpoint ack time for
> this task slot.
>
> Regards,
> Bekir
>
>
>
>
>
> On 17 Jul 2019, at 09:16, Congxian Qiu  wrote:
>
> Hi Bekir
>
> First of all, I think there is something wrong.  the state size is almost
> the same,  but the duration is different so much.
>
> The checkpoint for RocksDBStatebackend is dump sst files, then copy the
> needed sst files(if you enable incremental checkpoint, the sst files
> already on remote will not upload), then complete checkpoint. Can you check
> the network bandwidth usage during checkpoint?
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年7月16日周二 下午10:45写道:
>
>> Hi all,
>> We have a flink job with user state, checkpointing to RocksDBBackend
>> which is externally stored in AWS S3.
>> After we have migrated our cluster from 1.6 to 1.8, we see occasionally
>> that some slots do to acknowledge the checkpoints quick enough. As an
>> example: All slots acknowledge between 30-50 seconds except only one slot
>> acknowledges in 15 mins. Checkpoint sizes are similar to each other, like
>> 200-400 MB.
>>
>> We did not experience this weird behaviour in Flink 1.6. We have 5 min
>> checkpoint interval and this happens sometimes once in an hour sometimes
>> more but not in all the checkpoint requests. Please see the screenshot
>> below.
>>
>> Also another point: For the faulty slots, the duration is consistently 15
>> mins and some seconds, we couldn’t find out where this 15 mins response
>> time comes from. And each time it is a different task manager, not always
>> the same one.
>>
>> Do you guys aware of any other users having similar issues with the new
>> version and also a suggested bug fix or solution?
>>
>>
>>
>>
>> Thanks in advance,
>> Bekir Oguz
>>
>
>
>


Re: instable checkpointing after migration to flink 1.8

2019-07-17 Thread Congxian Qiu
Hi Bekir

First of all, I think there is something wrong.  the state size is almost
the same,  but the duration is different so much.

The checkpoint for RocksDBStatebackend is dump sst files, then copy the
needed sst files(if you enable incremental checkpoint, the sst files
already on remote will not upload), then complete checkpoint. Can you check
the network bandwidth usage during checkpoint?

Best,
Congxian


Bekir Oguz  于2019年7月16日周二 下午10:45写道:

> Hi all,
> We have a flink job with user state, checkpointing to RocksDBBackend which
> is externally stored in AWS S3.
> After we have migrated our cluster from 1.6 to 1.8, we see occasionally
> that some slots do to acknowledge the checkpoints quick enough. As an
> example: All slots acknowledge between 30-50 seconds except only one slot
> acknowledges in 15 mins. Checkpoint sizes are similar to each other, like
> 200-400 MB.
>
> We did not experience this weird behaviour in Flink 1.6. We have 5 min
> checkpoint interval and this happens sometimes once in an hour sometimes
> more but not in all the checkpoint requests. Please see the screenshot
> below.
>
> Also another point: For the faulty slots, the duration is consistently 15
> mins and some seconds, we couldn’t find out where this 15 mins response
> time comes from. And each time it is a different task manager, not always
> the same one.
>
> Do you guys aware of any other users having similar issues with the new
> version and also a suggested bug fix or solution?
>
>
>
>
> Thanks in advance,
> Bekir Oguz
>