Hi, Looking at the numbers, it seems to me that checkpoint execution (the times of the sync and async part) are always reasonable fast once they are executed on the task, but there are changes in the alignment time and the time from triggering a checkpoint to executing a checkpoint. As you are using windows and looking at the way the state size behaves before and after the problem, I might have a suggestion what could cause the problem. Before and during the problematic checkpoints, state size is rising. After the problem is gone, the state size is significantly smaller. Could it be that, as time progresses or jumps, there is a spike in session window triggering? When time moves it could be possible that suddenly a lot of windows are triggered and when a checkpoint barrier is arriving after the firing was triggered, it will have to wait until all window firing is completed for consistency reason. This would also explain the backpressure that you observe during this period, coming from a lot of / expensive window firing and future events/checkpoints can only proceed when the firing is done. You could investigate if that is what is happening and maybe take measure to avoid this, but that is highly dependent on your job logic.
Best, Stefan > On 11. Dec 2018, at 10:26, Dongwon Kim <eastcirc...@gmail.com> wrote: > > Hi all, > > We're facing the same problem mentioned in [1] - Very slow checkpoint > attempts of few tasks cause checkpoint failures and, furthermore, incur high > back pressure. > We're running our Flink jobs on a cluster where > - 2 masters + 8 worker nodes > - all nodes, even masters, are equipped with SSD's > - we have a separate cluster for Kafka > - we depend largely on Hadoop-2.7.3; YARN for deploying per-job clusters and > HDFS for storing checkpoints and savepoints > - All SSD's of each node serve as local-dirs for YARN NM and data-dirs for > HDFS DN > - we use RocksDB state backend > - we use the latest version, flink-1.7.0 > - we trigger checkpoints every 30 minutes and the size of state is not that > large as shown in the attached screenshot. > > The job itself recovers from checkpoint failures and back pressure after a > while; [2] shows that the job recovers after three failed checkpoints. > > Below is part of JM log message: > 2018-12-10 17:24:36,150 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 14 @ 1544430276096 for job e0cf3843cba85e8fdd5570ba18970d33. > 2018-12-10 17:24:57,912 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 14 for job e0cf3843cba85e8fdd5570ba18970d33 (43775252946 bytes in > 21781 ms). > 2018-12-10 17:54:36,133 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 15 @ 1544432076096 for job e0cf3843cba85e8fdd5570ba18970d33. > 2018-12-10 18:04:36,134 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 15 > of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing. > 2018-12-10 18:24:36,156 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 16 @ 1544433876096 for job e0cf3843cba85e8fdd5570ba18970d33. > 2018-12-10 18:34:36,157 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 16 > of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing. > 2018-12-10 18:54:36,138 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 17 @ 1544435676096 for job e0cf3843cba85e8fdd5570ba18970d33. > 2018-12-10 19:04:36,139 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 17 > of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing. > 2018-12-10 19:15:44,849 WARN > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late > message for now expired checkpoint attempt 15 from > e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33. > 2018-12-10 19:16:37,822 WARN > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late > message for now expired checkpoint attempt 16 from > e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33. > 2018-12-10 19:17:12,974 WARN > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late > message for now expired checkpoint attempt 17 from > e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33. > 2018-12-10 19:24:36,147 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 18 @ 1544437476096 for job e0cf3843cba85e8fdd5570ba18970d33. > 2018-12-10 19:32:05,869 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 18 for job e0cf3843cba85e8fdd5570ba18970d33 (52481833829 bytes in > 449738 ms). > #15, #16, and #17 fail due to a single task e81a7fb90d05da2bcec02a34d6f821e3, > which is a session-window task. > > As shown in [3], during that period, the average checkpoint end-to-end > duration for the window operation increased as follows: > - #15 : 37s > - #16 : 1m 4s > - #17 : 1m 25s > However, the average end-to-end duration for normal situations is not that > long (less than 30s). > During that period, back pressure affect the throughput a lot which is very > frustrating. > > How can I get rid of checkpoint failures and back pressure? > Isn't it somewhat related to HDFS clients? > > > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html> > [2] > <failures.png> > [3] > <???? ????? ᆺ 2018-12-11 오후 4.25.54.png>