[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r339081895 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -593,6 +599,10 @@ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingT } ); + checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor( Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r339078379 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -316,6 +319,9 @@ /** The coordinator for checkpoints, if snapshot checkpoints are enabled. */ private CheckpointCoordinator checkpointCoordinator; + /** TODO, replace it with main thread executor. */ + private ScheduledExecutorService checkpointCoordinatorTimer; Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r339075696 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java ## @@ -55,7 +54,7 @@ private BiConsumer> releasePartitionsConsumer = (ignore1, ignore2) -> { }; - private Consumer> checkpointConsumer = ignore -> { }; + private CheckpointConsumer checkpointConsumer = null; Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r339072220 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -535,149 +528,142 @@ public boolean isShutdown() { // we will actually trigger this checkpoint! - // we lock with a special lock to make sure that trigger requests do not overtake each other. - // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' - // may issue blocking operations. Using a different lock than the coordinator-wide lock, - // we avoid blocking the processing of 'acknowledge/decline' messages during that time. - synchronized (triggerLock) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r339071818 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ## @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception { cc.addMasterHook(statefulHook2); // trigger a checkpoint - assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), false)); + assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), false).isCompletedExceptionally()); Review comment: I almost missed this message. Yes, I agree, testing case through mockito is not easy to maintain. It could be a follow-up issue (there are already some in my list). I could come back to this after the main part merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338881552 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ## @@ -1466,11 +1477,24 @@ public void testSavepointsAreNotSubsumed() throws Exception { final JobID jid = new JobID(); final long timestamp = System.currentTimeMillis(); + final CountDownLatch checkpointTriggeredLatch1 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch2 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch3 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch4 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch5 = new CountDownLatch(2); + final CoundDownLatchCheckpointConsumers consumers = + new CoundDownLatchCheckpointConsumers( + new ArrayDeque() {{ + add(checkpointTriggeredLatch1); + add(checkpointTriggeredLatch2); + add(checkpointTriggeredLatch3); + add(checkpointTriggeredLatch4); + add(checkpointTriggeredLatch5); }}); Review comment: That's really a nice suggestion! I thought there might be some problems with a manually executor for these cases. But after did a testing, I realized all cases work well with manually executor. Moreover in this way, some time based case could be simplified a lot. BTW, I keep some "dead code" for now, for example, `TestingScheduledExecutor`, and the changes of `SimpleAckingTaskManagerGateway`. It might be useful for the later PR or others. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338879934 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -153,8 +155,8 @@ /** A handle to the current periodic trigger, to cancel it when necessary. */ private ScheduledFuture currentPeriodicTrigger; - /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */ - private long lastCheckpointCompletionNanos; + /** The relative timestamp when the next checkpoint could be triggered. */ + private long earliestRelativeTimeNextCheckpointBeTriggered; Review comment: I think the problem is this field includes too much meaning. So it's not easy to give it a short and clear name. I reverted this field back to `lastCheckpointCompletionRelativeTime`. Calculates the `nextCheckpointTriggerRelativeTime` in `checkMinPauseBetweenCheckpoints`, it's much easier to understand with context. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338410227 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java ## @@ -54,6 +55,8 @@ private BiConsumer> releasePartitionsConsumer = (ignore1, ignore2) -> { }; + private Consumer> checkpointConsumer = ignore -> { }; Review comment: Good suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335790644 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -535,149 +528,142 @@ public boolean isShutdown() { // we will actually trigger this checkpoint! - // we lock with a special lock to make sure that trigger requests do not overtake each other. - // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' - // may issue blocking operations. Using a different lock than the coordinator-wide lock, - // we avoid blocking the processing of 'acknowledge/decline' messages during that time. - synchronized (triggerLock) { Review comment: The `triggerSavepointInternal` will not invoke `triggerCheckpoint` directly anymore. In the prior commit of this one, I have changed the behavior of `triggerSavepointInternal`, it executes the trigger message in timer thread. So all trigger operations (checkpoint or savepoint) are executed in timer thread (will be replaced by main thread eventually). There will be no competition under this `triggerLock`. I will indicate it in the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335801615 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1306,10 +1312,8 @@ private long getRandomInitDelay() { return ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 1L); } - private ScheduledFuture scheduleTriggerWithDelay(long initDelay) { - return timer.scheduleAtFixedRate( - new ScheduledTrigger(), - initDelay, baseInterval, TimeUnit.MILLISECONDS); + private ScheduledFuture scheduleTriggerWithDelay(long delay) { + return timer.schedule(new ScheduledTrigger(), delay, TimeUnit.MILLISECONDS); Review comment: That's a good question. The direct reason is that currently the main thread executor does not support periodic scheduling, it's not implemented yet. So there are two options. 1. Support periodic scheduling in main thread executor. 2. Manually triggering checkpoint in `CheckpointCoordinator`. There is a short discussion of the option 1, see https://issues.apache.org/jira/browse/FLINK-13848. I choose option 2 currently. Because I'm a not fan of the way of periodic scheduling currently. There are a lot of cancellation of periodic scheduling. Cancellation is not accurate and reliable somewhat. But to be honest, there is a short-coming of option 2. We have to make sure the next trigger will be scheduled under all scenarios, successful, failed, request queued (it's messy of handling queued request now). It's a bit complicated than I expected. Do you have any suggestion of this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335793012 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java ## @@ -289,29 +292,29 @@ public void testStopPeriodicScheduler() throws Exception { failureManager); // Periodic + CompletableFuture periodicTriggerResult = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + true, + false); try { - coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - true, - false); + periodicTriggerResult.get(); fail("The triggerCheckpoint call expected an exception"); - } catch (CheckpointException e) { - assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason()); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof CheckpointException); Review comment: Good point! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335793060 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java ## @@ -289,29 +292,29 @@ public void testStopPeriodicScheduler() throws Exception { failureManager); // Periodic + CompletableFuture periodicTriggerResult = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + true, + false); try { - coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - true, - false); + periodicTriggerResult.get(); fail("The triggerCheckpoint call expected an exception"); - } catch (CheckpointException e) { - assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason()); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof CheckpointException); + assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, + ((CheckpointException) e.getCause()).getCheckpointFailureReason()); } // Not periodic - try { - coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - false, - false); - } catch (CheckpointException e) { - fail("Unexpected exception : " + e.getCheckpointFailureReason().message()); - } + CompletableFuture nonPeriodicTriggerResult = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + false, + false); + assertFalse(nonPeriodicTriggerResult.isCompletedExceptionally()); Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335792916 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -153,8 +155,8 @@ /** A handle to the current periodic trigger, to cancel it when necessary. */ private ScheduledFuture currentPeriodicTrigger; - /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */ - private long lastCheckpointCompletionNanos; + /** The relative timestamp when the next checkpoint could be triggered. */ + private long earliestRelativeTimeNextCheckpointBeTriggered; Review comment: As mentioned before, "earliest" means it's the earliest time next trigger could be triggered. "relative" means it's a relative time (`System.nanotime()`). Maybe `earliestNextCheckpointTriggerRelativeTime`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335792379 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -153,8 +155,8 @@ /** A handle to the current periodic trigger, to cancel it when necessary. */ private ScheduledFuture currentPeriodicTrigger; - /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */ - private long lastCheckpointCompletionNanos; + /** The relative timestamp when the next checkpoint could be triggered. */ Review comment: Actually it's not the time which next checkpoint "should" be triggered. The `earliestRelativeTimeNextCheckpointBeTriggered` means next trigger couldn't be launched until this time due to the restriction of `minPauseBetweenCheckpoints`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335791667 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -535,149 +528,142 @@ public boolean isShutdown() { Review comment: I fully understand... The reason I didn't rework this part is the `triggerCheckpoint` will be changed in later PRs. I was planning to make the non-functional reworking in the very end PRs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335790644 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -535,149 +528,142 @@ public boolean isShutdown() { // we will actually trigger this checkpoint! - // we lock with a special lock to make sure that trigger requests do not overtake each other. - // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' - // may issue blocking operations. Using a different lock than the coordinator-wide lock, - // we avoid blocking the processing of 'acknowledge/decline' messages during that time. - synchronized (triggerLock) { Review comment: The `triggerSavepointInternal` will not invoke `triggerCheckpoint` directly anymore. In the prior commit of this one, I have changed the behavior of `triggerSavepointInternal`, it executes the trigger message in timer thread. So all trigger operations (checkpoint or savepoint) are executed in timer thread (will be replaced by main thread eventually). There will be no competition under this `triggerLock`. I will indicate it in commit message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335789730 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java ## @@ -70,6 +73,10 @@ public void setReleasePartitionsConsumer(BiConsumer> checkpointConsumer) { Review comment: Ditto, it's used to check if the tasks receive `triggerCheckpoint` message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335789416 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ## @@ -1347,6 +1356,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception { CompletableFuture savepointFuture = coord.triggerSavepoint(timestamp, savepointDir); assertFalse(savepointFuture.isDone()); + checkpointTriggeredLatch1.await(); Review comment: As mentioned before, the latch is to check if the triggering is successful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335789306 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ## @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception { cc.addMasterHook(statefulHook2); // trigger a checkpoint - assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), false)); + assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), false).isCompletedExceptionally()); Review comment: If there is an error occurred immediately, the future will completed with an exception. If the triggering is successful, we could check whether the tasks receive the `triggerCheckpoint` message or not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335788255 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ## @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception { cc.addMasterHook(statefulHook2); // trigger a checkpoint - assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), false)); + assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), false).isCompletedExceptionally()); Review comment: Ah, sorry that's my fault. I forgot updating the java doc. I changed the behavior of `triggerCheckpoint` a bit. The return value, `CompletableFuture`, will not complete until this checkpoint is finished. It will not complete immediately after `triggerCheckpoint`. Unless there is an error occurred. It's similar to `triggerSavepoint`. I did this because eventually the `triggerCheckpoint` will be non-blocking. It will be separated into several stages, pre-checking, initializing the id and checkpointStorageLocation, triggering snapshotting master state, and then triggering snapshotting task state. We can't judge the triggering is successful or not by a boolean return value as before. So in this case, we can't wait for the future to complete, because it will never complete. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services