[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()
Myasuka commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1191127181 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java: ## @@ -394,18 +394,14 @@ public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) { .forEach( localHandle -> { changelogRegistry.stopTracking(localHandle); -localChangelogRegistry.register(localHandle, checkpointId); }); -} - -@Override -public void subsume(long checkpointId) { localChangelogRegistry.discardUpToCheckpoint(checkpointId); } @Override public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) { -localChangelogRegistry.prune(checkpointId); +// delete all accumulated local dstl files when abort +localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1); Review Comment: Thanks for the clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()
Myasuka commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1180408546 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java: ## @@ -394,18 +394,14 @@ public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) { .forEach( localHandle -> { changelogRegistry.stopTracking(localHandle); -localChangelogRegistry.register(localHandle, checkpointId); }); -} - -@Override -public void subsume(long checkpointId) { localChangelogRegistry.discardUpToCheckpoint(checkpointId); } @Override public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) { -localChangelogRegistry.prune(checkpointId); +// delete all accumulated local dstl files when abort +localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1); Review Comment: I don't think the `#discardUpToCheckpoint` has the same meaning as the previous `#prune`. `#prune` would only delete files with the specific checkpoint id, while `discardUpToCheckpoint` would delete files with checkpoint id smaller than the given one. And the `notifyCheckpointAbort` should tell the TM only abort files with that checkpoint id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()
Myasuka commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1170930069 ## flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java: ## @@ -246,15 +248,233 @@ void testFileAvailableAfterClose() throws Exception { } } +@Test +void testLocalFileDiscard() throws Exception { Review Comment: Got it, thanks for the explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()
Myasuka commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1170863630 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java: ## @@ -51,14 +52,15 @@ * Store the meta of files into {@link ChangelogTaskLocalStateStore} by * AsyncCheckpointRunnable#reportCompletedSnapshotStates(). * Pass control of the file to {@link LocalChangelogRegistry#register} when - * ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of the previous - * checkpoint will be deleted by {@link LocalChangelogRegistry#discardUpToCheckpoint} at - * the same time. + * FsStateChangelogWriter#persist , files of the previous checkpoint will be deleted by + * {@link LocalChangelogRegistry#discardUpToCheckpoint} when the previous checkpoint is + * confirmed. Review Comment: @zoltar9264 My concern is that we already know that this solution is not a good choice, we might need to refactor this in the near feature. Why we have to do the work twice? On the other hand, since local disk has a much smaller space than DFS. Leaving files on the disk could make no space left problem more easily, which actually make the status worse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()
Myasuka commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1170861351 ## flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java: ## @@ -246,15 +248,233 @@ void testFileAvailableAfterClose() throws Exception { } } +@Test +void testLocalFileDiscard() throws Exception { Review Comment: If so, why we cannot introduce a UT would fail with code before this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()
Myasuka commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1170299609 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java: ## @@ -51,14 +52,15 @@ * Store the meta of files into {@link ChangelogTaskLocalStateStore} by * AsyncCheckpointRunnable#reportCompletedSnapshotStates(). * Pass control of the file to {@link LocalChangelogRegistry#register} when - * ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of the previous - * checkpoint will be deleted by {@link LocalChangelogRegistry#discardUpToCheckpoint} at - * the same time. + * FsStateChangelogWriter#persist , files of the previous checkpoint will be deleted by + * {@link LocalChangelogRegistry#discardUpToCheckpoint} when the previous checkpoint is + * confirmed. Review Comment: I think the problem does not exist for local checkpoints before this PR. If so, I am against to introduce anther new problem here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()
Myasuka commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1168984233 ## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java: ## @@ -51,14 +52,15 @@ * Store the meta of files into {@link ChangelogTaskLocalStateStore} by * AsyncCheckpointRunnable#reportCompletedSnapshotStates(). * Pass control of the file to {@link LocalChangelogRegistry#register} when - * ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of the previous - * checkpoint will be deleted by {@link LocalChangelogRegistry#discardUpToCheckpoint} at - * the same time. + * FsStateChangelogWriter#persist , files of the previous checkpoint will be deleted by + * {@link LocalChangelogRegistry#discardUpToCheckpoint} when the previous checkpoint is + * confirmed. Review Comment: I think there exists a serious problem for this solution. What will happen if no checkpoint could complete? It seems the local files would not be cleaned forever. ## flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java: ## @@ -246,15 +248,233 @@ void testFileAvailableAfterClose() throws Exception { } } +@Test +void testLocalFileDiscard() throws Exception { Review Comment: I tried to revert the code changes in this PR, this unit test could still pass. It seems the added test is useless. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org