关于Flink checkpoint偶尔会比较长时间的问题。

*环境与背景:*
版本:flink1.10.0
数据量:每秒约10万左右的记录,数据源是kafka
计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。

*问题:*
    大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。

checkpoint情况大致如下:

[image: image.png]
[image: image.png]
[image: image.png]

2020-06-24 21:09:53,369 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Trigger
checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.

2020-06-24 21:09:58,327 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from e88ea2f790430c9c160e540ef0546d60.

2020-06-24 21:09:59,266 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from b93d7167db364dfdcbda886944f1482f.

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424
MB (used/committed/max)]

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]

2020-06-24 21:10:08,346 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from e88ea2f790430c9c160e540ef0546d60.

2020-06-24 21:10:09,286 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from b93d7167db364dfdcbda886944f1482f.

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424
MB (used/committed/max)]

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]


省略。。。。


2020-06-24 21:55:39,875 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:55:39,875 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:55:39,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter
-> Timestamps/Watermarks (4/10)

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished
synchronous checkpoints for checkpoint 316 on task Source: Custom Source ->
Map -> Filter -> Timestamps/Watermarks (4/10)

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished
synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot
duration 0 ms

2020-06-24 21:55:41,737 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms.

2020-06-24 21:55:41,738 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished
asynchronous part of checkpoint 316. Asynchronous duration: 16 ms

2020-06-24 21:55:42,008 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3.

2020-06-24 21:55:42,008 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for
checkpoint 316.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering
checkpoint 316 at 1593004193363.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding
buffered data back.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
checkpoint (316) CHECKPOINT on task
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
(10/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
(10/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:42,980 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
checkpointDirectory=hdfs://xxxx/chk-316,
sharedStateDirectory=hdfs://xxxx/shared,
taskOwnedStateDirectory=hdfs://xxxx/taskowned,
metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
(10/10),5,Flink Task Threads] took 870 ms.

2020-06-24 21:55:42,981 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished
synchronous checkpoints for checkpoint 316 on task
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)

2020-06-24 21:55:42,981 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot
duration 870 ms

2020-06-24 21:55:42,981 DEBUG
org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes

2020-06-24 21:55:42,981 DEBUG
org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data.

2020-06-24 21:55:43,814 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from e88ea2f790430c9c160e540ef0546d60.

2020-06-24 21:55:44,758 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from b93d7167db364dfdcbda886944f1482f.

2020-06-24 21:55:45,714 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
checkpointDirectory=hdfs://xxxx/chk-316,
sharedStateDirectory=hdfs://xxxx/shared,
taskOwnedStateDirectory=hdfs://xxxx/taskowned,
metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms.

2020-06-24 21:55:45,715 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms

2020-06-24 21:55:49,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: 111/114/424
MB (used/committed/max)]

2020-06-24 21:55:49,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Direct memory stats: Count: 17414, Total Capacity: 584128352, Used
Memory: 584128353

2020-06-24 21:55:49,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:55:49,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC
COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]


-- 
**************************************
 tivanli
**************************************

回复