Hi Stefan,

Thank you for the answer.
So you mean that any window use in the stream will result in synchronous 
snapshotting?
When are you planning to fix this?
And is there a workaround?

Thanks again,
Tovi
From: Stefan Richter [mailto:s.rich...@data-artisans.com]
Sent: יום ג 30 ינואר 2018 21:10
To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com>
Cc: user@flink.apache.org
Subject: Re: Sync and Async checkpoint time

Hi,

this looks like the timer service is the culprit for this problem. Timers are 
currently not stored in the state backend, but in a separate on-heap data 
structure that does not support copy-on-write or async snapshots in general. 
Therefore, writing the timers for a snapshot is always synchronous and this 
explanation would also match your observation that the problem mainly affects 
window operators, which make heavy use of timers.

Best,
Stefan


Am 30.01.2018 um 18:17 schrieb Sofer, Tovi 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>>:

Hi group,

In our project we are using asynchronous  FSStateBackend, and we are trying to 
move to distributed storage – currently S3.
When using this storage we are experiencing issues of high backpressure and 
high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, 
so it shouldn’t have such high effect.

We looked at checkpoint history in web, and details from log.
•         From web it seems that Sync checkpoint duration is much higher then 
Async duration. (again, this is only when using s3, not when using local 
storage)
This happens especially in window operators (tumbling windows) such as below.
•         But from log Sync time seems very short…

Do you have any estimation why the async write to FSStateBackend has such high 
effect on the stream performance?

Checkpoint config:

env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);


•         Checkpoint info from console:
<image004.png>

•         Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 
12139 ms.
2018-01-30 07:33:36,418 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-83] Received acknowledge message for 
checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 
12396 ms.
2018-01-30 07:33:36,677 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 
13067 ms.
2018-01-30 07:33:37,349 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 
13143 ms.
2018-01-30 07:33:37,420 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 
13234 ms.
2018-01-30 07:33:37,509 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,589 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
[ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task 
Threads] took 1 ms.
2018-01-30 07:33:37,678 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 
13403 ms.
2018-01-30 07:33:37,680 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:38,143 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 
13863 ms.

Thanks & regards,
Tovi

Reply via email to