[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()

2023-05-11 Thread via GitHub


Myasuka commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1191127181


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java:
##
@@ -394,18 +394,14 @@ public void confirm(SequenceNumber from, SequenceNumber 
to, long checkpointId) {
 .forEach(
 localHandle -> {
 changelogRegistry.stopTracking(localHandle);
-localChangelogRegistry.register(localHandle, 
checkpointId);
 });
-}
-
-@Override
-public void subsume(long checkpointId) {
 localChangelogRegistry.discardUpToCheckpoint(checkpointId);
 }
 
 @Override
 public void reset(SequenceNumber from, SequenceNumber to, long 
checkpointId) {
-localChangelogRegistry.prune(checkpointId);
+// delete all accumulated local dstl files when abort
+localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1);

Review Comment:
   Thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()

2023-04-28 Thread via GitHub


Myasuka commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1180408546


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java:
##
@@ -394,18 +394,14 @@ public void confirm(SequenceNumber from, SequenceNumber 
to, long checkpointId) {
 .forEach(
 localHandle -> {
 changelogRegistry.stopTracking(localHandle);
-localChangelogRegistry.register(localHandle, 
checkpointId);
 });
-}
-
-@Override
-public void subsume(long checkpointId) {
 localChangelogRegistry.discardUpToCheckpoint(checkpointId);
 }
 
 @Override
 public void reset(SequenceNumber from, SequenceNumber to, long 
checkpointId) {
-localChangelogRegistry.prune(checkpointId);
+// delete all accumulated local dstl files when abort
+localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1);

Review Comment:
   I don't think the `#discardUpToCheckpoint` has the same meaning as the 
previous `#prune`. 
   `#prune` would only delete files with the specific checkpoint id, while 
`discardUpToCheckpoint` would delete files with checkpoint id smaller than the 
given one. And the `notifyCheckpointAbort` should tell the TM only abort files 
with that checkpoint id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()

2023-04-19 Thread via GitHub


Myasuka commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1170930069


##
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java:
##
@@ -246,15 +248,233 @@ void testFileAvailableAfterClose() throws Exception {
 }
 }
 
+@Test
+void testLocalFileDiscard() throws Exception {

Review Comment:
   Got it, thanks for the explanation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()

2023-04-19 Thread via GitHub


Myasuka commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1170863630


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java:
##
@@ -51,14 +52,15 @@
  *   Store the meta of files into {@link ChangelogTaskLocalStateStore} 
by
  *   AsyncCheckpointRunnable#reportCompletedSnapshotStates().
  *   Pass control of the file to {@link 
LocalChangelogRegistry#register} when
- *   ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of 
the previous
- *   checkpoint will be deleted by {@link 
LocalChangelogRegistry#discardUpToCheckpoint} at
- *   the same time.
+ *   FsStateChangelogWriter#persist , files of the previous checkpoint 
will be deleted by
+ *   {@link LocalChangelogRegistry#discardUpToCheckpoint} when the 
previous checkpoint is
+ *   confirmed.

Review Comment:
   @zoltar9264 My concern is that we already know that this solution is not a 
good choice, we might need to refactor this in the near feature. Why we have to 
do the work twice?
   
   On the other hand, since local disk has a much smaller space than DFS. 
Leaving files on the disk could make no space left problem more easily, which 
actually make the status worse.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()

2023-04-19 Thread via GitHub


Myasuka commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1170861351


##
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java:
##
@@ -246,15 +248,233 @@ void testFileAvailableAfterClose() throws Exception {
 }
 }
 
+@Test
+void testLocalFileDiscard() throws Exception {

Review Comment:
   If so, why we cannot introduce a UT would fail with code before this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()

2023-04-18 Thread via GitHub


Myasuka commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1170299609


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java:
##
@@ -51,14 +52,15 @@
  *   Store the meta of files into {@link ChangelogTaskLocalStateStore} 
by
  *   AsyncCheckpointRunnable#reportCompletedSnapshotStates().
  *   Pass control of the file to {@link 
LocalChangelogRegistry#register} when
- *   ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of 
the previous
- *   checkpoint will be deleted by {@link 
LocalChangelogRegistry#discardUpToCheckpoint} at
- *   the same time.
+ *   FsStateChangelogWriter#persist , files of the previous checkpoint 
will be deleted by
+ *   {@link LocalChangelogRegistry#discardUpToCheckpoint} when the 
previous checkpoint is
+ *   confirmed.

Review Comment:
   I think the problem does not exist for local checkpoints before this PR. If 
so, I am against to introduce anther new problem here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on a diff in pull request #21822: [FLINK-30863][state] Register local recovery files of changelog before notifyCheckpointComplete()

2023-04-17 Thread via GitHub


Myasuka commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1168984233


##
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java:
##
@@ -51,14 +52,15 @@
  *   Store the meta of files into {@link ChangelogTaskLocalStateStore} 
by
  *   AsyncCheckpointRunnable#reportCompletedSnapshotStates().
  *   Pass control of the file to {@link 
LocalChangelogRegistry#register} when
- *   ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of 
the previous
- *   checkpoint will be deleted by {@link 
LocalChangelogRegistry#discardUpToCheckpoint} at
- *   the same time.
+ *   FsStateChangelogWriter#persist , files of the previous checkpoint 
will be deleted by
+ *   {@link LocalChangelogRegistry#discardUpToCheckpoint} when the 
previous checkpoint is
+ *   confirmed.

Review Comment:
   I think there exists a serious problem for this solution. What will happen 
if no checkpoint could complete? It seems the local files would not be cleaned 
forever.



##
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java:
##
@@ -246,15 +248,233 @@ void testFileAvailableAfterClose() throws Exception {
 }
 }
 
+@Test
+void testLocalFileDiscard() throws Exception {

Review Comment:
   I tried to revert the code changes in this PR, this unit test could still 
pass. It seems the added test is useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org