Hi group,

In our project we are using asynchronous  FSStateBackend, and we are trying to 
move to distributed storage - currently S3.
When using this storage we are experiencing issues of high backpressure and 
high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, 
so it shouldn't have such high effect.

We looked at checkpoint history in web, and details from log.

*         From web it seems that Sync checkpoint duration is much higher then 
Async duration. (again, this is only when using s3, not when using local 
storage)
This happens especially in window operators (tumbling windows) such as below.

*         But from log Sync time seems very short...


Do you have any estimation why the async write to FSStateBackend has such high 
effect on the stream performance?

Checkpoint config:

env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);



*         Checkpoint info from console:
[cid:image004.png@01D399E6.3A2F69F0]


*         Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 
12139 ms.
2018-01-30 07:33:36,418 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-83] Received acknowledge message for 
checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 
12396 ms.
2018-01-30 07:33:36,677 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 
13067 ms.
2018-01-30 07:33:37,349 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 
13143 ms.
2018-01-30 07:33:37,420 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 
13234 ms.
2018-01-30 07:33:37,509 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,589 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
[ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task 
Threads] took 1 ms.
2018-01-30 07:33:37,678 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 
13403 ms.
2018-01-30 07:33:37,680 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:38,143 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 
13863 ms.

Thanks & regards,
Tovi

Reply via email to