Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1779823630 yay! thank you @pnowojski for your time on the reviews! really appreciate the learning from you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1779343032 Thanks @yigress for your contribution! Merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski merged PR #23425: URL: https://github.com/apache/flink/pull/23425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1778479885 @pnowojski I rebased and it kicked off a rerun successfully. I also run some job for a day without problem. if looks good can you help merge it too? thank you so much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1776690248 @yigress , can you force push something into this branch? For example rebase your code and force-push. That will trigger the build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1775778599 @pnowojski the command flinkbot run azure didn't kick off new runs. I can't start a re-run either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1775637374 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1774965429 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1774447810 @pnowojski could you review again? appreciate every constructive review you gave! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1367568947 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +81,19 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +LOG.debug( +"Clean checkpoint {} parallel-mode={} shouldDiscard={}", +checkpoint.getCheckpointID(), +parallelMode, +shouldDiscard); +if (parallelMode) { +cleanupParallel(checkpoint, shouldDiscard, postCleanAction, executor); +} else { +Checkpoint.DiscardObject discardObject = +shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; + +cleanup(checkpoint, discardObject::discard, postCleanAction, executor); Review Comment: looks like DefaultCompletedCheckpointStore#shutdown calls the cleanCheckpoint from main thread instead of the ioExecutor. Combined into one method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1367568908 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,19 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/* Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(true) +.withDescription( +"Option whether to discard a checkpoint's states in parallel using" ++ " the ExecutorService passed into the cleaner"); Review Comment: this is nice! add the config into test randomize. ## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java: ## @@ -4303,4 +4310,40 @@ private void ackCheckpoint( .build(, "test"); } + +static class OperatorSubtaskStateMock { +OperatorSubtaskState subtaskState; +OperatorStateHandle managedOpHandle; +OperatorStateHandle rawOpHandle; + +OperatorSubtaskStateMock() throws Exception { +this.managedOpHandle = mock(OperatorStreamStateHandle.class); +this.rawOpHandle = mock(OperatorStreamStateHandle.class); +OperatorSubtaskState subtaskState = +OperatorSubtaskState.builder() +.setManagedOperatorState(managedOpHandle) +.setRawOperatorState(rawOpHandle) +.build(); +this.subtaskState = spy(subtaskState); +} + +public OperatorSubtaskState getSubtaskState() { +return this.subtaskState; +} + +public void reset() { +Mockito.reset(managedOpHandle); +Mockito.reset(rawOpHandle); +} + +public void verifyDiscard() throws Exception { +verify(managedOpHandle, times(1)).discardState(); +verify(rawOpHandle, times(1)).discardState(); +} + +public void verifyNotDiscard() throws Exception { +verify(managedOpHandle, never()).discardState(); +verify(rawOpHandle, never()).discardState(); +} +} Review Comment: updated. great instructions on the mock usage (or rather no-mock usage)! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r137562 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +81,19 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +LOG.debug( +"Clean checkpoint {} parallel-mode={} shouldDiscard={}", +checkpoint.getCheckpointID(), +parallelMode, +shouldDiscard); +if (parallelMode) { +cleanupParallel(checkpoint, shouldDiscard, postCleanAction, executor); +} else { +Checkpoint.DiscardObject discardObject = +shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; + +cleanup(checkpoint, discardObject::discard, postCleanAction, executor); Review Comment: nit: maybe remove this method, and replace it with a call: ``` cleanupParallel(checkpoint, shouldDiscard, postCleanAction, Executors#newDirectExecutorService); ``` ? Wouldn't this behave identically? ## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java: ## @@ -4303,4 +4310,40 @@ private void ackCheckpoint( .build(, "test"); } + +static class OperatorSubtaskStateMock { +OperatorSubtaskState subtaskState; +OperatorStateHandle managedOpHandle; +OperatorStateHandle rawOpHandle; + +OperatorSubtaskStateMock() throws Exception { +this.managedOpHandle = mock(OperatorStreamStateHandle.class); +this.rawOpHandle = mock(OperatorStreamStateHandle.class); +OperatorSubtaskState subtaskState = +OperatorSubtaskState.builder() +.setManagedOperatorState(managedOpHandle) +.setRawOperatorState(rawOpHandle) +.build(); +this.subtaskState = spy(subtaskState); +} + +public OperatorSubtaskState getSubtaskState() { +return this.subtaskState; +} + +public void reset() { +Mockito.reset(managedOpHandle); +Mockito.reset(rawOpHandle); +} + +public void verifyDiscard() throws Exception { +verify(managedOpHandle, times(1)).discardState(); +verify(rawOpHandle, times(1)).discardState(); +} + +public void verifyNotDiscard() throws Exception { +verify(managedOpHandle, never()).discardState(); +verify(rawOpHandle, never()).discardState(); +} +} Review Comment: Can you rewrite this class without using `Mockito` (as per our [code style](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations))? I would guess you can replace ``` this.managedOpHandle = mock(OperatorStreamStateHandle.class); this.rawOpHandle = mock(OperatorStreamStateHandle.class); ``` with proper mocks implementing `org.apache.flink.runtime.state.OperatorStateHandle` interface that internally are checking if `discardState()` has been called or not (boolean flag) and exposing via `boolean isDiscarded()` method or something similar. ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,19 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/* Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(true) +.withDescription( +"Option whether to discard a checkpoint's states in parallel using" ++ " the ExecutorService passed into the cleaner"); Review Comment: It's probably best if you ranomize this config value for tests in: ``` org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration ``` similarly as
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1366104698 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,20 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/** + * Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(false) Review Comment: add back the config and set default value to true. @pnowojski can you review again? thank you for your patient reviews! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1771447069 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1365015056 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +70,26 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +if (shouldDiscard) { +incrementNumberOfCheckpointsToClean(); +checkpoint +.markAsDiscarded() +.discardAsync(executor) +.handle( +(Object outerIgnored, Throwable outerThrowable) -> { +if (outerThrowable != null) { +LOG.warn( +"Could not properly discard completed checkpoint {}.", +checkpoint.getCheckpointID(), +outerThrowable); +} +decrementNumberOfCheckpointsToClean(); Review Comment: Ahhh, yes you are right. Thanks for the explanation :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1365013893 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,20 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/** + * Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(false) Review Comment: I would keep more or else the old code path, for example if there is a bug or some unexpected behaviour, so that users could disable this feature. Otherwise I think it should be enabled by default and if nobody complains, in next or the following release we could drop the flag to force sequential clean up (so we would only have parallel clean up). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1364888361 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,20 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/** + * Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(false) Review Comment: @pnowojski can you advise if it is better to have an option to keep the original behavior? thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1364762331 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +70,26 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +if (shouldDiscard) { +incrementNumberOfCheckpointsToClean(); +checkpoint +.markAsDiscarded() +.discardAsync(executor) +.handle( +(Object outerIgnored, Throwable outerThrowable) -> { +if (outerThrowable != null) { +LOG.warn( +"Could not properly discard completed checkpoint {}.", +checkpoint.getCheckpointID(), +outerThrowable); +} +decrementNumberOfCheckpointsToClean(); Review Comment: i think the confusion is from that I moved the incrementNumberOfCheckpointsToClean() to only shouldDiscard=true. so when shouldDiscard=false, there is no increment, thus no decrement as well. Or I am seriously missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1363482855 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +70,26 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +if (shouldDiscard) { +incrementNumberOfCheckpointsToClean(); +checkpoint +.markAsDiscarded() +.discardAsync(executor) +.handle( +(Object outerIgnored, Throwable outerThrowable) -> { +if (outerThrowable != null) { +LOG.warn( +"Could not properly discard completed checkpoint {}.", +checkpoint.getCheckpointID(), +outerThrowable); +} +decrementNumberOfCheckpointsToClean(); Review Comment: You have just said it yourself. Old code for `shouldDiscard=false` is doing: ``` try { cleanupAction.run(); } catch (...) { ... } finally { decrementNumberOfCheckpointsToClean(); postCleanupAction.run(); } ``` with no exception and the `cleanupAction` being `NOOP`, code simplifies to: ``` decrementNumberOfCheckpointsToClean(); postCleanupAction.run(); ``` `decrementNumberOfCheckpointsToClean` is clearly there. I would even suspect that the deadlock in the tests could be actually caused by this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1363482855 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +70,26 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +if (shouldDiscard) { +incrementNumberOfCheckpointsToClean(); +checkpoint +.markAsDiscarded() +.discardAsync(executor) +.handle( +(Object outerIgnored, Throwable outerThrowable) -> { +if (outerThrowable != null) { +LOG.warn( +"Could not properly discard completed checkpoint {}.", +checkpoint.getCheckpointID(), +outerThrowable); +} +decrementNumberOfCheckpointsToClean(); Review Comment: You have just said it yourself. Old code for `shouldDiscard=false` is doing: ``` try { cleanupAction.run(); } catch (...) { ... } finally { decrementNumberOfCheckpointsToClean(); postCleanupAction.run(); } ``` with no exception and the `cleanupAction` being `NOOP`, code simplifies to: ``` decrementNumberOfCheckpointsToClean(); postCleanupAction.run(); ``` `decrementNumberOfCheckpointsToClean` is clearly there. I would even suspect that the [deadlock in the tests](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53769=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8) could be actually caused by this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1363482855 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +70,26 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +if (shouldDiscard) { +incrementNumberOfCheckpointsToClean(); +checkpoint +.markAsDiscarded() +.discardAsync(executor) +.handle( +(Object outerIgnored, Throwable outerThrowable) -> { +if (outerThrowable != null) { +LOG.warn( +"Could not properly discard completed checkpoint {}.", +checkpoint.getCheckpointID(), +outerThrowable); +} +decrementNumberOfCheckpointsToClean(); Review Comment: You have just said it yourself. Old code for `shouldDiscard=false` is doing: ``` try { cleanupAction.run(); } catch (...) { ... } finally { decrementNumberOfCheckpointsToClean(); postCleanupAction.run(); } ``` with no exception and the `cleanupAction` being `NOOP`, code simplifies to: ``` decrementNumberOfCheckpointsToClean(); postCleanupAction.run(); ``` `decrementNumberOfCheckpointsToClean` is clearly there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1362546056 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +70,26 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +if (shouldDiscard) { +incrementNumberOfCheckpointsToClean(); +checkpoint +.markAsDiscarded() +.discardAsync(executor) +.handle( +(Object outerIgnored, Throwable outerThrowable) -> { +if (outerThrowable != null) { +LOG.warn( +"Could not properly discard completed checkpoint {}.", +checkpoint.getCheckpointID(), +outerThrowable); +} +decrementNumberOfCheckpointsToClean(); Review Comment: for shouldDiscard=false, there is no cleanup action, the original code does incrementNumberOfCheckpointsToClean(); -> { cleanupAction } -> finally { decrementNumberOfCheckpointsToClean(); postCleanupAction }, so it looks like can just run postCleanupAction ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,20 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/** + * Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(false) Review Comment: no misleading here : ) I thought if default set true, maybe just treat this as an improvement than a feature with a new config. there are places (in shutdown) directly call checkpoint$discardObject#discard without going through CheckpointCleaner, so those have to do it sequentially. for discardAsync do you think it is better to have the optionto do it a slower way and do it a faster way? if so I will add back the config. appreciate your reviews! @pnowojski -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1361791271 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -71,10 +70,26 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { -Checkpoint.DiscardObject discardObject = -shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +if (shouldDiscard) { +incrementNumberOfCheckpointsToClean(); +checkpoint +.markAsDiscarded() +.discardAsync(executor) +.handle( +(Object outerIgnored, Throwable outerThrowable) -> { +if (outerThrowable != null) { +LOG.warn( +"Could not properly discard completed checkpoint {}.", +checkpoint.getCheckpointID(), +outerThrowable); +} +decrementNumberOfCheckpointsToClean(); Review Comment: `decrementNumberOfCheckpointsToClean` this should be also called if `shouldDiscard == false`, right? ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,20 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/** + * Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(false) Review Comment: Ahh, sorry for missleading. I meant to keep the config option, but make the default value true, as this seems to be universally positive change, unless there is a bug/some unforeseen complication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1361552392 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -73,8 +83,12 @@ public void cleanCheckpoint( Executor executor) { Checkpoint.DiscardObject discardObject = shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +LOG.debug("Clean checkpoint {} fast-mode={}", checkpoint.getCheckpointID(), parallelMode); +if (parallelMode) { +cleanup(checkpoint, () -> discardObject.discard(executor), postCleanAction, executor); Review Comment: Yes it did feel awkward to have 2 level of async. updated. please review again @pnowojski, thank you very much for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1361551024 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,20 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/** + * Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(false) Review Comment: so instead of a new configuration, I updated the change with discardAsync that will just discard the state objects in parallel. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1358183466 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -109,6 +109,20 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); +/** + * Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ +@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) +public static final ConfigOption CLEANER_PARALLEL_MODE = +ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") +.booleanType() +.defaultValue(false) Review Comment: Why not `defalutValue(true)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1761419453 Thanks for the contribution. It looks like a nice improvement. I've left one comment that could simplify the code/architecture a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
pnowojski commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1358178168 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ## @@ -73,8 +83,12 @@ public void cleanCheckpoint( Executor executor) { Checkpoint.DiscardObject discardObject = shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - -cleanup(checkpoint, discardObject::discard, postCleanAction, executor); +LOG.debug("Clean checkpoint {} fast-mode={}", checkpoint.getCheckpointID(), parallelMode); +if (parallelMode) { +cleanup(checkpoint, () -> discardObject.discard(executor), postCleanAction, executor); Review Comment: What's the purpose of introduce this two level async action in the `ioExecutor`? First level is the loop in `Checkpoint#discardOperatorStates`, that schedules 2nd level async actions that actually are doing the clean up. It doesn't sound right. Apart of unnecessarily blocking one io thread, it introduces opportunities for deadlocks. What if we have N threads and N simultaneous checkpoints to clean up? It could happen that `ioExecutor` is fully blocked by the no-op `Checkpoint#discardOperatorStates` actions that can not complete. Since you are already adding `getDiscardables()` method, you could create and schedule discarding of the discardables (in the `ioExecutor`) from the thread that's calling `CheckpointsCleaner#cleanCheckpoint`. With some additional `CompletableFuture.allOf(...)` that would perform: ``` decrementNumberOfCheckpointsToClean(); postCleanupAction.run(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org