Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-25 Thread via GitHub


yigress commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1779823630

   yay! thank you @pnowojski for your time on the reviews! really appreciate 
the learning from you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-25 Thread via GitHub


pnowojski commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1779343032

   Thanks @yigress  for your contribution! Merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-25 Thread via GitHub


pnowojski merged PR #23425:
URL: https://github.com/apache/flink/pull/23425


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-24 Thread via GitHub


yigress commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1778479885

   @pnowojski I rebased and it kicked off a rerun successfully. I also run some 
job for a day without problem. if looks good can you help merge it too? thank 
you so much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-24 Thread via GitHub


pnowojski commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1776690248

   @yigress , can you force push something into this branch? For example rebase 
your code and force-push. That will trigger the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-23 Thread via GitHub


yigress commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1775778599

   @pnowojski the command flinkbot run azure didn't kick off new runs. I can't 
start a re-run either.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-23 Thread via GitHub


yigress commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1775637374

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-23 Thread via GitHub


pnowojski commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1774965429

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-22 Thread via GitHub


yigress commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1774447810

   @pnowojski could you review again? appreciate every constructive review you 
gave!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-20 Thread via GitHub


yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1367568947


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +81,19 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+LOG.debug(
+"Clean checkpoint {} parallel-mode={} shouldDiscard={}",
+checkpoint.getCheckpointID(),
+parallelMode,
+shouldDiscard);
+if (parallelMode) {
+cleanupParallel(checkpoint, shouldDiscard, postCleanAction, 
executor);
+} else {
+Checkpoint.DiscardObject discardObject =
+shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
+
+cleanup(checkpoint, discardObject::discard, postCleanAction, 
executor);

Review Comment:
   looks like DefaultCompletedCheckpointStore#shutdown calls the 
cleanCheckpoint from main thread instead of the ioExecutor. Combined into one 
method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-20 Thread via GitHub


yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1367568908


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,19 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/* Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+"Option whether to discard a checkpoint's states 
in parallel using"
++ " the ExecutorService passed into the 
cleaner");

Review Comment:
   this is nice! add the config into test randomize.



##
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##
@@ -4303,4 +4310,40 @@ private void ackCheckpoint(
 .build(,
 "test");
 }
+
+static class OperatorSubtaskStateMock {
+OperatorSubtaskState subtaskState;
+OperatorStateHandle managedOpHandle;
+OperatorStateHandle rawOpHandle;
+
+OperatorSubtaskStateMock() throws Exception {
+this.managedOpHandle = mock(OperatorStreamStateHandle.class);
+this.rawOpHandle = mock(OperatorStreamStateHandle.class);
+OperatorSubtaskState subtaskState =
+OperatorSubtaskState.builder()
+.setManagedOperatorState(managedOpHandle)
+.setRawOperatorState(rawOpHandle)
+.build();
+this.subtaskState = spy(subtaskState);
+}
+
+public OperatorSubtaskState getSubtaskState() {
+return this.subtaskState;
+}
+
+public void reset() {
+Mockito.reset(managedOpHandle);
+Mockito.reset(rawOpHandle);
+}
+
+public void verifyDiscard() throws Exception {
+verify(managedOpHandle, times(1)).discardState();
+verify(rawOpHandle, times(1)).discardState();
+}
+
+public void verifyNotDiscard() throws Exception {
+verify(managedOpHandle, never()).discardState();
+verify(rawOpHandle, never()).discardState();
+}
+}

Review Comment:
   updated. great instructions on the mock usage (or rather no-mock usage)!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-20 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r137562


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +81,19 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+LOG.debug(
+"Clean checkpoint {} parallel-mode={} shouldDiscard={}",
+checkpoint.getCheckpointID(),
+parallelMode,
+shouldDiscard);
+if (parallelMode) {
+cleanupParallel(checkpoint, shouldDiscard, postCleanAction, 
executor);
+} else {
+Checkpoint.DiscardObject discardObject =
+shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
+
+cleanup(checkpoint, discardObject::discard, postCleanAction, 
executor);

Review Comment:
   nit: maybe remove this method, and replace it with a call:
   ```
   cleanupParallel(checkpoint, shouldDiscard, postCleanAction, 
Executors#newDirectExecutorService);
   ```
   ? Wouldn't this behave identically?



##
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##
@@ -4303,4 +4310,40 @@ private void ackCheckpoint(
 .build(,
 "test");
 }
+
+static class OperatorSubtaskStateMock {
+OperatorSubtaskState subtaskState;
+OperatorStateHandle managedOpHandle;
+OperatorStateHandle rawOpHandle;
+
+OperatorSubtaskStateMock() throws Exception {
+this.managedOpHandle = mock(OperatorStreamStateHandle.class);
+this.rawOpHandle = mock(OperatorStreamStateHandle.class);
+OperatorSubtaskState subtaskState =
+OperatorSubtaskState.builder()
+.setManagedOperatorState(managedOpHandle)
+.setRawOperatorState(rawOpHandle)
+.build();
+this.subtaskState = spy(subtaskState);
+}
+
+public OperatorSubtaskState getSubtaskState() {
+return this.subtaskState;
+}
+
+public void reset() {
+Mockito.reset(managedOpHandle);
+Mockito.reset(rawOpHandle);
+}
+
+public void verifyDiscard() throws Exception {
+verify(managedOpHandle, times(1)).discardState();
+verify(rawOpHandle, times(1)).discardState();
+}
+
+public void verifyNotDiscard() throws Exception {
+verify(managedOpHandle, never()).discardState();
+verify(rawOpHandle, never()).discardState();
+}
+}

Review Comment:
   Can you rewrite this class without using `Mockito` (as per our [code 
style](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations))?
 I would guess you can replace
   
   ```
   this.managedOpHandle = mock(OperatorStreamStateHandle.class);
   this.rawOpHandle = mock(OperatorStreamStateHandle.class);
   ```
   with proper mocks implementing 
`org.apache.flink.runtime.state.OperatorStateHandle` interface that internally 
are checking if `discardState()` has been called or not (boolean flag) and 
exposing via `boolean isDiscarded()` method or something similar.



##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,19 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/* Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+"Option whether to discard a checkpoint's states 
in parallel using"
++ " the ExecutorService passed into the 
cleaner");

Review Comment:
   It's probably best if you ranomize this config value for tests in:
   ```
   org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration
   ```
   similarly as 

Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-19 Thread via GitHub


yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1366104698


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/**
+ * Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(false)

Review Comment:
   add back the config and set default value to true. @pnowojski can you review 
again? thank you for your patient reviews!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-19 Thread via GitHub


yigress commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1771447069

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-19 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1365015056


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +70,26 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+if (shouldDiscard) {
+incrementNumberOfCheckpointsToClean();
+checkpoint
+.markAsDiscarded()
+.discardAsync(executor)
+.handle(
+(Object outerIgnored, Throwable outerThrowable) -> 
{
+if (outerThrowable != null) {
+LOG.warn(
+"Could not properly discard 
completed checkpoint {}.",
+checkpoint.getCheckpointID(),
+outerThrowable);
+}
+decrementNumberOfCheckpointsToClean();

Review Comment:
   Ahhh, yes you are right. Thanks for the explanation :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-19 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1365013893


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/**
+ * Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(false)

Review Comment:
   I would keep more or else the old code path, for example if there is a bug 
or some unexpected behaviour, so that users could disable this feature. 
Otherwise I think it should be enabled by default and if nobody complains, in 
next or the following release we could drop the flag to force sequential clean 
up (so we would only have parallel clean up).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-18 Thread via GitHub


yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1364888361


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/**
+ * Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(false)

Review Comment:
   @pnowojski can you advise if it is better to have an option to keep the 
original behavior? thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-18 Thread via GitHub


yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1364762331


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +70,26 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+if (shouldDiscard) {
+incrementNumberOfCheckpointsToClean();
+checkpoint
+.markAsDiscarded()
+.discardAsync(executor)
+.handle(
+(Object outerIgnored, Throwable outerThrowable) -> 
{
+if (outerThrowable != null) {
+LOG.warn(
+"Could not properly discard 
completed checkpoint {}.",
+checkpoint.getCheckpointID(),
+outerThrowable);
+}
+decrementNumberOfCheckpointsToClean();

Review Comment:
   i think the confusion is from that I moved the 
incrementNumberOfCheckpointsToClean() to only shouldDiscard=true. so when 
shouldDiscard=false, there is no increment, thus no decrement as well. Or I am 
seriously missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-18 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1363482855


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +70,26 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+if (shouldDiscard) {
+incrementNumberOfCheckpointsToClean();
+checkpoint
+.markAsDiscarded()
+.discardAsync(executor)
+.handle(
+(Object outerIgnored, Throwable outerThrowable) -> 
{
+if (outerThrowable != null) {
+LOG.warn(
+"Could not properly discard 
completed checkpoint {}.",
+checkpoint.getCheckpointID(),
+outerThrowable);
+}
+decrementNumberOfCheckpointsToClean();

Review Comment:
   You have just said it yourself. Old code for `shouldDiscard=false` is doing:
   ```
   try {
   cleanupAction.run();
   } catch (...) {
   ...
   } finally {
   decrementNumberOfCheckpointsToClean();
   postCleanupAction.run();
   }
   ```
   with no exception and the `cleanupAction` being `NOOP`, code simplifies to:
   ```
   decrementNumberOfCheckpointsToClean();
   postCleanupAction.run();
   ```
   
   `decrementNumberOfCheckpointsToClean` is clearly there.
   
   I would even suspect that the deadlock in the tests could be actually caused 
by this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-18 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1363482855


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +70,26 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+if (shouldDiscard) {
+incrementNumberOfCheckpointsToClean();
+checkpoint
+.markAsDiscarded()
+.discardAsync(executor)
+.handle(
+(Object outerIgnored, Throwable outerThrowable) -> 
{
+if (outerThrowable != null) {
+LOG.warn(
+"Could not properly discard 
completed checkpoint {}.",
+checkpoint.getCheckpointID(),
+outerThrowable);
+}
+decrementNumberOfCheckpointsToClean();

Review Comment:
   You have just said it yourself. Old code for `shouldDiscard=false` is doing:
   ```
   try {
   cleanupAction.run();
   } catch (...) {
   ...
   } finally {
   decrementNumberOfCheckpointsToClean();
   postCleanupAction.run();
   }
   ```
   with no exception and the `cleanupAction` being `NOOP`, code simplifies to:
   ```
   decrementNumberOfCheckpointsToClean();
   postCleanupAction.run();
   ```
   
   `decrementNumberOfCheckpointsToClean` is clearly there.
   
   I would even suspect that the [deadlock in the 
tests](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53769=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8)
 could be actually caused by this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-18 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1363482855


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +70,26 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+if (shouldDiscard) {
+incrementNumberOfCheckpointsToClean();
+checkpoint
+.markAsDiscarded()
+.discardAsync(executor)
+.handle(
+(Object outerIgnored, Throwable outerThrowable) -> 
{
+if (outerThrowable != null) {
+LOG.warn(
+"Could not properly discard 
completed checkpoint {}.",
+checkpoint.getCheckpointID(),
+outerThrowable);
+}
+decrementNumberOfCheckpointsToClean();

Review Comment:
   You have just said it yourself. Old code for `shouldDiscard=false` is doing:
   ```
   try {
   cleanupAction.run();
   } catch (...) {
   ...
   } finally {
   decrementNumberOfCheckpointsToClean();
   postCleanupAction.run();
   }
   ```
   with no exception and the `cleanupAction` being `NOOP`, code simplifies to:
   ```
   decrementNumberOfCheckpointsToClean();
   postCleanupAction.run();
   ```
   
   `decrementNumberOfCheckpointsToClean` is clearly there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-17 Thread via GitHub


yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1362546056


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +70,26 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+if (shouldDiscard) {
+incrementNumberOfCheckpointsToClean();
+checkpoint
+.markAsDiscarded()
+.discardAsync(executor)
+.handle(
+(Object outerIgnored, Throwable outerThrowable) -> 
{
+if (outerThrowable != null) {
+LOG.warn(
+"Could not properly discard 
completed checkpoint {}.",
+checkpoint.getCheckpointID(),
+outerThrowable);
+}
+decrementNumberOfCheckpointsToClean();

Review Comment:
   for shouldDiscard=false, there is no cleanup action, the original code does 
   incrementNumberOfCheckpointsToClean(); -> {  cleanupAction  } -> finally { 
decrementNumberOfCheckpointsToClean(); postCleanupAction }, so it looks like 
can just run postCleanupAction



##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/**
+ * Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(false)

Review Comment:
   no misleading here : ) I thought if default set true, maybe just treat this 
as an improvement than a feature with a new config.  there are places (in 
shutdown) directly call checkpoint$discardObject#discard without going through 
CheckpointCleaner, so those have to do it sequentially.  for discardAsync do 
you think it is better to have the optionto do it a slower way and do it a 
faster way? if so I will add back the config. appreciate your reviews! 
@pnowojski 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-17 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1361791271


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -71,10 +70,26 @@ public void cleanCheckpoint(
 boolean shouldDiscard,
 Runnable postCleanAction,
 Executor executor) {
-Checkpoint.DiscardObject discardObject =
-shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+if (shouldDiscard) {
+incrementNumberOfCheckpointsToClean();
+checkpoint
+.markAsDiscarded()
+.discardAsync(executor)
+.handle(
+(Object outerIgnored, Throwable outerThrowable) -> 
{
+if (outerThrowable != null) {
+LOG.warn(
+"Could not properly discard 
completed checkpoint {}.",
+checkpoint.getCheckpointID(),
+outerThrowable);
+}
+decrementNumberOfCheckpointsToClean();

Review Comment:
   `decrementNumberOfCheckpointsToClean` this should be also called if 
`shouldDiscard == false`, right?



##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/**
+ * Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(false)

Review Comment:
   Ahh, sorry for missleading. I meant to keep the config option, but make the 
default value true, as this seems to be universally positive change, unless 
there is a bug/some unforeseen complication. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-17 Thread via GitHub


yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1361552392


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -73,8 +83,12 @@ public void cleanCheckpoint(
 Executor executor) {
 Checkpoint.DiscardObject discardObject =
 shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+LOG.debug("Clean checkpoint {} fast-mode={}", 
checkpoint.getCheckpointID(), parallelMode);
+if (parallelMode) {
+cleanup(checkpoint, () -> discardObject.discard(executor), 
postCleanAction, executor);

Review Comment:
   Yes it did feel awkward to have 2 level of async. updated. please review 
again @pnowojski, thank you very much for the review! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-17 Thread via GitHub


yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1361551024


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/**
+ * Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(false)

Review Comment:
   so instead of a new configuration, I updated the change with discardAsync 
that will just discard the state objects in parallel. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-13 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1358183466


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
 .defaultValue(1)
 .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+/**
+ * Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+public static final ConfigOption CLEANER_PARALLEL_MODE =
+ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+.booleanType()
+.defaultValue(false)

Review Comment:
   Why not `defalutValue(true)`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-13 Thread via GitHub


pnowojski commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1761419453

   Thanks for the contribution. It looks like a nice improvement. I've left one 
comment that could simplify the code/architecture a bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-13 Thread via GitHub


pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1358178168


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##
@@ -73,8 +83,12 @@ public void cleanCheckpoint(
 Executor executor) {
 Checkpoint.DiscardObject discardObject =
 shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+LOG.debug("Clean checkpoint {} fast-mode={}", 
checkpoint.getCheckpointID(), parallelMode);
+if (parallelMode) {
+cleanup(checkpoint, () -> discardObject.discard(executor), 
postCleanAction, executor);

Review Comment:
   What's the purpose of introduce this two level async action in the 
`ioExecutor`? First level is the loop in `Checkpoint#discardOperatorStates`, 
that schedules 2nd level async actions that actually are doing the clean up. It 
doesn't sound right. Apart of unnecessarily blocking one io thread, it 
introduces opportunities for deadlocks. What if we have N threads and N 
simultaneous checkpoints to clean up? It could happen that `ioExecutor` is 
fully blocked by the no-op `Checkpoint#discardOperatorStates` actions that can 
not complete.
   
   Since you are already adding `getDiscardables()` method, you could create 
and schedule discarding of the discardables (in the `ioExecutor`) from the 
thread that's calling `CheckpointsCleaner#cleanCheckpoint`. With some 
additional `CompletableFuture.allOf(...)` that would perform:
   ```
   decrementNumberOfCheckpointsToClean();
   postCleanupAction.run();
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org