[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r339013035 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -316,6 +319,9 @@ /** The coordinator for checkpoints, if snapshot checkpoints are enabled. */ private CheckpointCoordinator checkpointCoordinator; + /** TODO, replace it with main thread executor. */ + private ScheduledExecutorService checkpointCoordinatorTimer; Review comment: `@Nullable`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r339013082 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -593,6 +599,10 @@ public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingT } ); + checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor( Review comment: `checkState(checkpointCoordinatorTimer == null)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338983887 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java ## @@ -55,7 +54,7 @@ private BiConsumer> releasePartitionsConsumer = (ignore1, ignore2) -> { }; - private Consumer> checkpointConsumer = ignore -> { }; + private CheckpointConsumer checkpointConsumer = null; Review comment: nit: missing `@Nullable`? But I think it would be better to leave it as not nullable field, with a default (inlined lambda function) implementation ignoring the call? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338983040 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ## @@ -1466,11 +1477,24 @@ public void testSavepointsAreNotSubsumed() throws Exception { final JobID jid = new JobID(); final long timestamp = System.currentTimeMillis(); + final CountDownLatch checkpointTriggeredLatch1 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch2 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch3 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch4 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch5 = new CountDownLatch(2); + final CoundDownLatchCheckpointConsumers consumers = + new CoundDownLatchCheckpointConsumers( + new ArrayDeque() {{ + add(checkpointTriggeredLatch1); + add(checkpointTriggeredLatch2); + add(checkpointTriggeredLatch3); + add(checkpointTriggeredLatch4); + add(checkpointTriggeredLatch5); }}); Review comment: I'm glad that it worked out :) > BTW, I keep some "dead code" for now, for example, TestingScheduledExecutor, and the changes of SimpleAckingTaskManagerGateway. It might be useful for the later PR or others. I think I would revisit this at the end of this effort. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338097031 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ## @@ -1466,11 +1477,24 @@ public void testSavepointsAreNotSubsumed() throws Exception { final JobID jid = new JobID(); final long timestamp = System.currentTimeMillis(); + final CountDownLatch checkpointTriggeredLatch1 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch2 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch3 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch4 = new CountDownLatch(2); + final CountDownLatch checkpointTriggeredLatch5 = new CountDownLatch(2); + final CoundDownLatchCheckpointConsumers consumers = + new CoundDownLatchCheckpointConsumers( + new ArrayDeque() {{ + add(checkpointTriggeredLatch1); + add(checkpointTriggeredLatch2); + add(checkpointTriggeredLatch3); + add(checkpointTriggeredLatch4); + add(checkpointTriggeredLatch5); }}); Review comment: Maybe instead of latching, we could just wait for an executor to complete pending work? Generally speaking with single threaded, non blocking, asynchronous code it is usually easier to debug and maintain such tests, when using the following pattern: 1. perform some action that enqueues some async action, but do not let the action to be executed in the background 2. manually trigger execution of the enqueued action, preferably in the main thread 3. repeat until there are no more enqueued actions (applies if an action can enqueue more actions) 4. validate the final state (in that case everything is executed by the main test thread, so it's easy to debug) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338090326 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ## @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception { cc.addMasterHook(statefulHook2); // trigger a checkpoint - assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), false)); + assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), false).isCompletedExceptionally()); Review comment: Ok, I get it. Thanks for the explanation! I'm not a fan of using mockito like that and especially not a fan of doing the checks whether a method `foo(long, long, Executor)` was called and how many times (can easily brake and give false positive errors), but rewriting this test is probably out of the scope of this already quite big PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338106044 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -153,8 +155,8 @@ /** A handle to the current periodic trigger, to cancel it when necessary. */ private ScheduledFuture currentPeriodicTrigger; - /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */ - private long lastCheckpointCompletionNanos; + /** The relative timestamp when the next checkpoint could be triggered. */ + private long earliestRelativeTimeNextCheckpointBeTriggered; Review comment: I would shorten it to `nextCheckpointTriggerRelativeTime` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r338102677 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -535,149 +528,142 @@ public boolean isShutdown() { // we will actually trigger this checkpoint! - // we lock with a special lock to make sure that trigger requests do not overtake each other. - // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' - // may issue blocking operations. Using a different lock than the coordinator-wide lock, - // we avoid blocking the processing of 'acknowledge/decline' messages during that time. - synchronized (triggerLock) { Review comment: Ok, again thanks for the explanation. Could you copy/paste this explanation to the commit message? (not comment in the code) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335432387 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java ## @@ -289,29 +292,29 @@ public void testStopPeriodicScheduler() throws Exception { failureManager); // Periodic + CompletableFuture periodicTriggerResult = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + true, + false); try { - coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - true, - false); + periodicTriggerResult.get(); fail("The triggerCheckpoint call expected an exception"); - } catch (CheckpointException e) { - assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason()); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof CheckpointException); + assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, + ((CheckpointException) e.getCause()).getCheckpointFailureReason()); } // Not periodic - try { - coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - false, - false); - } catch (CheckpointException e) { - fail("Unexpected exception : " + e.getCheckpointFailureReason().message()); - } + CompletableFuture nonPeriodicTriggerResult = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + false, + false); + assertFalse(nonPeriodicTriggerResult.isCompletedExceptionally()); Review comment: ditto about waiting for the future to complete? ``` assertFalse(nonPeriodicTriggerResult.isCompletedExceptionally()); ``` -> ``` nonPeriodicTriggerResult.get(); ``` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335030476 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ## @@ -1347,6 +1356,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception { CompletableFuture savepointFuture = coord.triggerSavepoint(timestamp, savepointDir); assertFalse(savepointFuture.isDone()); + checkpointTriggeredLatch1.await(); Review comment: Why do we need this latch? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335020372 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ## @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception { cc.addMasterHook(statefulHook2); // trigger a checkpoint - assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), false)); + assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), false).isCompletedExceptionally()); Review comment: Shouldn't we wait for the future to complete? (and ditto in other places?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335434198 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1306,10 +1312,8 @@ private long getRandomInitDelay() { return ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 1L); } - private ScheduledFuture scheduleTriggerWithDelay(long initDelay) { - return timer.scheduleAtFixedRate( - new ScheduledTrigger(), - initDelay, baseInterval, TimeUnit.MILLISECONDS); + private ScheduledFuture scheduleTriggerWithDelay(long delay) { + return timer.schedule(new ScheduledTrigger(), delay, TimeUnit.MILLISECONDS); Review comment: What is the benefit/purpose of this change and this commit (`Manually schedule periodic checkpoint trigger instead of scheduleAtFixedRate`) as a whole? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335030916 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ## @@ -1502,13 +1526,16 @@ public void testSavepointsAreNotSubsumed() throws Exception { // Trigger savepoint and checkpoint CompletableFuture savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir); + checkpointTriggeredLatch1.await(); Review comment: again, do we need it with `coord.triggerCheckpoint(timestamp + 1, false).get()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335429166 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -153,8 +155,8 @@ /** A handle to the current periodic trigger, to cancel it when necessary. */ private ScheduledFuture currentPeriodicTrigger; - /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */ - private long lastCheckpointCompletionNanos; + /** The relative timestamp when the next checkpoint could be triggered. */ + private long earliestRelativeTimeNextCheckpointBeTriggered; Review comment: nit: `%s/earliestRelativeTimeNextCheckpointBeTriggered/nextCheckpointTriggerTime/g`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335426392 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -535,149 +528,142 @@ public boolean isShutdown() { Review comment: This method requires to be split into a smaller sub methods s baadddlyyy that it actually hurts my brain... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335428138 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -153,8 +155,8 @@ /** A handle to the current periodic trigger, to cancel it when necessary. */ private ScheduledFuture currentPeriodicTrigger; - /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */ - private long lastCheckpointCompletionNanos; + /** The relative timestamp when the next checkpoint could be triggered. */ Review comment: nit: `could` -> `should`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335022275 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java ## @@ -301,7 +310,7 @@ private void testRestoreLatestCheckpointIsPreferSavepoint(boolean isPreferCheckp failureManager); //trigger a checkpoint and wait to become a completed checkpoint - assertTrue(coord.triggerCheckpoint(timestamp, false)); + assertFalse(coord.triggerCheckpoint(timestamp, false).isCompletedExceptionally()); Review comment: If you replaced that with `coord.triggerCheckpoint(timestamp, false).get()`, would you still need the `checkpointTriggeredLatch`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335425982 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -535,149 +528,142 @@ public boolean isShutdown() { // we will actually trigger this checkpoint! - // we lock with a special lock to make sure that trigger requests do not overtake each other. - // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' - // may issue blocking operations. Using a different lock than the coordinator-wide lock, - // we avoid blocking the processing of 'acknowledge/decline' messages during that time. - synchronized (triggerLock) { Review comment: Can we get rid of the `triggerLock` here? With the code as it is, `CheckpointCoordinator#triggerCheckpoint(long, CheckpointProperties, java.lang.String, boolean, boolean)` method can be called from the `timer` thread (it's single threaded now) and also from the main thread via `triggerSavepointInternal`, right? Shouldn't this change happen after we get rid of the timer thread altogether? Also this commit deserves a better/more detailed explanation in the commit message. Could you write/explain there why can we drop the lock? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335431898 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java ## @@ -289,29 +292,29 @@ public void testStopPeriodicScheduler() throws Exception { failureManager); // Periodic + CompletableFuture periodicTriggerResult = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + true, + false); try { - coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - true, - false); + periodicTriggerResult.get(); fail("The triggerCheckpoint call expected an exception"); - } catch (CheckpointException e) { - assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason()); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof CheckpointException); Review comment: `ExceptionUtils#findThrowable(java.lang.Throwable, java.lang.Class)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering URL: https://github.com/apache/flink/pull/9853#discussion_r335409965 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java ## @@ -70,6 +73,10 @@ public void setReleasePartitionsConsumer(BiConsumer> checkpointConsumer) { Review comment: I can not find the usages of this code (introduced in the `[FLINK-13904][tests] Support checkpoint consumer of SimpleAckingTaskManagerGateway` commit). Am I missing something? Is it being used in a later PR (if so, can you move it to the correct PR)? Or is it some unused leftover? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services