Feifan Wang created FLINK-23949: ----------------------------------- Summary: first incremental checkpoint after a savepoint will degenerate into a full checkpoint Key: FLINK-23949 URL: https://issues.apache.org/jira/browse/FLINK-23949 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.13.2, 1.12.5, 1.11.4 Reporter: Feifan Wang Attachments: image-2021-08-25-00-59-05-779.png
In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files corresponding to the checkpoint id,and clean it in _CheckpointListener#notifyCheckpointComplete ._ {code:java} @Override public void notifyCheckpointComplete(long completedCheckpointId) { synchronized (materializedSstFiles) { if (completedCheckpointId > lastCompletedCheckpointId) { materializedSstFiles .keySet() .removeIf(checkpointId -> checkpointId < completedCheckpointId); lastCompletedCheckpointId = completedCheckpointId; } } }{code} This works well without savepoint, but when a savepoint is completed, it will clean up the _materializedSstFiles_ of the previous checkpoint. It leads to the first checkpoint after the savepoint must upload all files in rocksdb. !image-2021-08-25-00-59-05-779.png|width=1640,height=225! Solving the problem is also very simple, I propose to change CheckpointListener#notifyCheckpointComplete to the following form : {code:java} @Override public void notifyCheckpointComplete(long completedCheckpointId) { synchronized (materializedSstFiles) { if (completedCheckpointId > lastCompletedCheckpointId && materializedSstFiles.keySet().contains(completedCheckpointId)) { materializedSstFiles .keySet() .removeIf(checkpointId -> checkpointId < completedCheckpointId); lastCompletedCheckpointId = completedCheckpointId; } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)