[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-19 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368303994
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws 
Exception {
failureManager);
 
// Periodic
+   final CompletableFuture 
onCompletionPromise1 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
-   manuallyTriggeredScheduledExecutor.triggerAll();
+   onCompletionPromise1.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   final Optional 
checkpointExceptionOptional =
+   ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+   assertTrue(checkpointExceptionOptional.isPresent());
+   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+   
checkpointExceptionOptional.get().getCheckpointFailureReason());
}
 
// Not periodic
+   final CompletableFuture 
onCompletionPromise2 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   false,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
+   assertFalse(onCompletionPromise2.isCompletedExceptionally());
+   }
+
+   @Test
+   public void testTriggerCheckpointWithShuttingDownCoordinator() throws 
Exception {
+   // create some mock Execution vertices that receive the 
checkpoint trigger messages
+   final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+   ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+   // set up the coordinator and validate the initial state
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   60,
+   60,
+   0,
+   Integer.MAX_VALUE,
+   
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+   true,
+   false,
+   0);
 
 Review comment:
   Yes, there are too many codes copied everywhere. I'll introduce some utils 
for these initializations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-19 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368274106
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws 
Exception {
failureManager);
 
// Periodic
+   final CompletableFuture 
onCompletionPromise1 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
-   manuallyTriggeredScheduledExecutor.triggerAll();
+   onCompletionPromise1.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   final Optional 
checkpointExceptionOptional =
+   ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+   assertTrue(checkpointExceptionOptional.isPresent());
+   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+   
checkpointExceptionOptional.get().getCheckpointFailureReason());
}
 
// Not periodic
+   final CompletableFuture 
onCompletionPromise2 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   false,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
+   assertFalse(onCompletionPromise2.isCompletedExceptionally());
+   }
+
+   @Test
+   public void testTriggerCheckpointWithShuttingDownCoordinator() throws 
Exception {
+   // create some mock Execution vertices that receive the 
checkpoint trigger messages
+   final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+   ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+   // set up the coordinator and validate the initial state
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   60,
+   60,
+   0,
+   Integer.MAX_VALUE,
+   
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+   true,
+   false,
+   0);
+   CheckpointCoordinator coord = new CheckpointCoordinator(
+   new JobID(),
+   chkConfig,
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new StandaloneCheckpointIDCounter(),
+   new StandaloneCompletedCheckpointStore(1),
+   new MemoryStateBackend(),
+   Executors.directExecutor(),
+   manuallyTriggeredScheduledExecutor,
+   SharedStateRegistry.DEFAULT_FACTORY,
+   failureManager);
+
+   coord.startCheckpointScheduler();
+   // Periodic
+   final CompletableFuture 
onCompletionPromise = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+
+   coord.shutdown(JobStatus.FAILED);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-19 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368273690
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws 
Exception {
failureManager);
 
// Periodic
+   final CompletableFuture 
onCompletionPromise1 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
-   manuallyTriggeredScheduledExecutor.triggerAll();
+   onCompletionPromise1.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   final Optional 
checkpointExceptionOptional =
+   ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+   assertTrue(checkpointExceptionOptional.isPresent());
+   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+   
checkpointExceptionOptional.get().getCheckpointFailureReason());
}
 
// Not periodic
+   final CompletableFuture 
onCompletionPromise2 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   false,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
+   assertFalse(onCompletionPromise2.isCompletedExceptionally());
+   }
+
+   @Test
+   public void testTriggerCheckpointWithShuttingDownCoordinator() throws 
Exception {
+   // create some mock Execution vertices that receive the 
checkpoint trigger messages
+   final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+   ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+   // set up the coordinator and validate the initial state
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   60,
+   60,
+   0,
+   Integer.MAX_VALUE,
+   
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+   true,
+   false,
+   0);
+   CheckpointCoordinator coord = new CheckpointCoordinator(
+   new JobID(),
+   chkConfig,
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new StandaloneCheckpointIDCounter(),
+   new StandaloneCompletedCheckpointStore(1),
+   new MemoryStateBackend(),
+   Executors.directExecutor(),
+   manuallyTriggeredScheduledExecutor,
+   SharedStateRegistry.DEFAULT_FACTORY,
+   failureManager);
+
+   coord.startCheckpointScheduler();
+   // Periodic
+   final CompletableFuture 
onCompletionPromise = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+
+   coord.shutdown(JobStatus.FAILED);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-18 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368273421
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws 
Exception {
failureManager);
 
// Periodic
+   final CompletableFuture 
onCompletionPromise1 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
-   manuallyTriggeredScheduledExecutor.triggerAll();
+   onCompletionPromise1.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   final Optional 
checkpointExceptionOptional =
+   ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+   assertTrue(checkpointExceptionOptional.isPresent());
+   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+   
checkpointExceptionOptional.get().getCheckpointFailureReason());
}
 
// Not periodic
+   final CompletableFuture 
onCompletionPromise2 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   false,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
+   assertFalse(onCompletionPromise2.isCompletedExceptionally());
+   }
+
+   @Test
+   public void testTriggerCheckpointWithShuttingDownCoordinator() throws 
Exception {
+   // create some mock Execution vertices that receive the 
checkpoint trigger messages
+   final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+   ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+   // set up the coordinator and validate the initial state
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   60,
+   60,
+   0,
+   Integer.MAX_VALUE,
+   
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+   true,
+   false,
+   0);
+   CheckpointCoordinator coord = new CheckpointCoordinator(
+   new JobID(),
+   chkConfig,
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new StandaloneCheckpointIDCounter(),
+   new StandaloneCompletedCheckpointStore(1),
+   new MemoryStateBackend(),
+   Executors.directExecutor(),
+   manuallyTriggeredScheduledExecutor,
+   SharedStateRegistry.DEFAULT_FACTORY,
+   failureManager);
+
+   coord.startCheckpointScheduler();
+   // Periodic
+   final CompletableFuture 
onCompletionPromise = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+
+   coord.shutdown(JobStatus.FAILED);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-18 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368273302
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws 
Exception {
failureManager);
 
// Periodic
+   final CompletableFuture 
onCompletionPromise1 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
-   manuallyTriggeredScheduledExecutor.triggerAll();
+   onCompletionPromise1.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   final Optional 
checkpointExceptionOptional =
+   ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+   assertTrue(checkpointExceptionOptional.isPresent());
+   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+   
checkpointExceptionOptional.get().getCheckpointFailureReason());
}
 
// Not periodic
+   final CompletableFuture 
onCompletionPromise2 = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   false,
+   false);
+   manuallyTriggeredScheduledExecutor.triggerAll();
+   assertFalse(onCompletionPromise2.isCompletedExceptionally());
+   }
+
+   @Test
+   public void testTriggerCheckpointWithShuttingDownCoordinator() throws 
Exception {
+   // create some mock Execution vertices that receive the 
checkpoint trigger messages
+   final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+   ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+   // set up the coordinator and validate the initial state
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   60,
+   60,
+   0,
+   Integer.MAX_VALUE,
+   
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+   true,
+   false,
+   0);
+   CheckpointCoordinator coord = new CheckpointCoordinator(
+   new JobID(),
+   chkConfig,
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new StandaloneCheckpointIDCounter(),
+   new StandaloneCompletedCheckpointStore(1),
+   new MemoryStateBackend(),
+   Executors.directExecutor(),
+   manuallyTriggeredScheduledExecutor,
+   SharedStateRegistry.DEFAULT_FACTORY,
+   failureManager);
+
+   coord.startCheckpointScheduler();
+   // Periodic
+   final CompletableFuture 
onCompletionPromise = coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
+
+   coord.shutdown(JobStatus.FAILED);
+   manuallyTriggeredScheduledExecutor.triggerAll();
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-18 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368263483
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -485,76 +481,152 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
+   // we can't trigger checkpoint directly if 
there is a trigger request being processed
+   // or queued
+   triggerRequestQueue.add(new 
CheckpointTriggerRequest(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise));
+   return onCompletionPromise;
}
}
+   startTriggeringCheckpoint(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise);
+   return onCompletionPromise;
+   }
 
-   // next, check if all tasks that need to acknowledge the 
checkpoint are running.
-   // if not, abort the checkpoint
-   Map ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+   private void startTriggeringCheckpoint(
+   long timestamp,
+   CheckpointProperties props,
+   @Nullable String externalSavepointLocation,
+   boolean isPeriodic,
+   boolean advanceToEndOfTime,
+   CompletableFuture onCompletionPromise) {
 
-   for (ExecutionVertex ev : tasksToWaitFor) {
-   Execution ee = ev.getCurrentExecutionAttempt();
-   if (ee != null) {
-   ackTasks.put(ee.getAttemptId(), ev);
-   } else {
-   LOG.info("Checkpoint 

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-17 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r367845520
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -485,76 +481,151 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
+   // we can't trigger checkpoint directly if 
there is a trigger request being processed
+   // or queued
+   triggerRequestQueue.add(new 
CheckpointTriggerRequest(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise));
+   return onCompletionPromise;
}
}
+   startTriggeringCheckpoint(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise);
+   return onCompletionPromise;
+   }
 
-   // next, check if all tasks that need to acknowledge the 
checkpoint are running.
-   // if not, abort the checkpoint
-   Map ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+   private void startTriggeringCheckpoint(
+   long timestamp,
+   CheckpointProperties props,
+   @Nullable String externalSavepointLocation,
+   boolean isPeriodic,
+   boolean advanceToEndOfTime,
+   CompletableFuture onCompletionPromise) {
 
-   for (ExecutionVertex ev : tasksToWaitFor) {
-   Execution ee = ev.getCurrentExecutionAttempt();
-   if (ee != null) {
-   ackTasks.put(ee.getAttemptId(), ev);
-   } else {
-   LOG.info("Checkpoint 

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r367778946
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -576,85 +648,181 @@ public boolean isShutdown() {
checkpoint.setStatsCallback(callback);
}
 
-   // schedule the timer that will clean up the expired checkpoints
-   final Runnable canceller = () -> {
-   synchronized (lock) {
-   // only do the work if the checkpoint is not 
discarded anyways
-   // note that checkpoint completion discards the 
pending checkpoint object
-   if (!checkpoint.isDiscarded()) {
-   LOG.info("Checkpoint {} of job {} 
expired before completing.", checkpointID, job);
-
-   abortPendingCheckpoint(
-   checkpoint,
-   new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
-   }
-   }
-   };
+   synchronized (lock) {
 
-   try {
-   // re-acquire the coordinator-wide lock
-   synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
+   pendingCheckpoints.put(checkpointID, checkpoint);
 
-   LOG.info("Triggering checkpoint {} @ {} for job 
{}.", checkpointID, timestamp, job);
+   ScheduledFuture cancellerHandle = timer.schedule(
+   new CheckpointCanceller(checkpoint),
+   checkpointTimeout, TimeUnit.MILLISECONDS);
 
-   pendingCheckpoints.put(checkpointID, 
checkpoint);
+   if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+   // checkpoint is already disposed!
+   cancellerHandle.cancel(false);
+   }
+   }
 
-   ScheduledFuture cancellerHandle = 
timer.schedule(
-   canceller,
-   checkpointTimeout, 
TimeUnit.MILLISECONDS);
+   LOG.info("Triggering checkpoint {} @ {} for job {}.", 
checkpointID, timestamp, job);
+   return checkpoint;
+   }
 
-   if 
(!checkpoint.setCancellerHandle(cancellerHandle)) {
-   // checkpoint is already disposed!
-   cancellerHandle.cancel(false);
-   }
+   /**
+* Snapshot master hook states asynchronously.
+*
+* @param checkpoint the pending checkpoint
+* @return the future represents master hook states are finished or not
+*/
+   private CompletableFuture snapshotMasterState(PendingCheckpoint 
checkpoint) {
+   if (masterHooks.isEmpty()) {
+   return CompletableFuture.completedFuture(null);
+   }
 
-   // TODO, asynchronously snapshots master hook 
without waiting here
-   for (MasterTriggerRestoreHook masterHook : 
masterHooks.values()) {
-   final MasterState masterState =
-   
MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
-   .get(checkpointTimeout, 
TimeUnit.MILLISECONDS);
-   
checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
-   }
-   
Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
-   }
-   // end of lock scope
+   final long checkpointID = checkpoint.getCheckpointId();
+   final long timestamp = checkpoint.getCheckpointTimestamp();
 
-   final CheckpointOptions checkpointOptions = new 
CheckpointOptions(
-   props.getCheckpointType(),
-   
checkpointStorageLocation.getLocationReference());
+   final CompletableFuture masterStateCompletableFuture = 
new CompletableFuture<>();
+   for (MasterTriggerRestoreHook masterHook : 
masterHooks.values()) {
+   MasterHooks
+   

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r367778123
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -485,76 +481,151 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
+   // we can't trigger checkpoint directly if 
there is a trigger request being processed
+   // or queued
+   triggerRequestQueue.add(new 
CheckpointTriggerRequest(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise));
+   return onCompletionPromise;
}
}
+   startTriggeringCheckpoint(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise);
+   return onCompletionPromise;
+   }
 
-   // next, check if all tasks that need to acknowledge the 
checkpoint are running.
-   // if not, abort the checkpoint
-   Map ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+   private void startTriggeringCheckpoint(
+   long timestamp,
+   CheckpointProperties props,
+   @Nullable String externalSavepointLocation,
+   boolean isPeriodic,
+   boolean advanceToEndOfTime,
+   CompletableFuture onCompletionPromise) {
 
-   for (ExecutionVertex ev : tasksToWaitFor) {
-   Execution ee = ev.getCurrentExecutionAttempt();
-   if (ee != null) {
-   ackTasks.put(ee.getAttemptId(), ev);
-   } else {
-   LOG.info("Checkpoint 

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r367774356
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -485,76 +481,151 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
 
 Review comment:
   `triggerRequestQueue` might be accessed in main thread, through `shutdown` 
and `stopCheckpointScheduler`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-14 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r366676607
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -484,77 +484,158 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
+   // we can't trigger checkpoint directly if 
there is a trigger request being processed
+   // or queued
+   triggerRequestQueue.add(new 
CheckpointTriggerRequest(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise));
+   return onCompletionPromise;
}
}
+   startTriggeringCheckpoint(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise);
+   return onCompletionPromise;
+   }
 
-   // next, check if all tasks that need to acknowledge the 
checkpoint are running.
-   // if not, abort the checkpoint
-   Map ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+   private void startTriggeringCheckpoint(
+   long timestamp,
+   CheckpointProperties props,
+   @Nullable String externalSavepointLocation,
+   boolean isPeriodic,
+   boolean advanceToEndOfTime,
+   CompletableFuture onCompletionPromise) {
 
-   for (ExecutionVertex ev : tasksToWaitFor) {
-   Execution ee = ev.getCurrentExecutionAttempt();
-   if (ee != null) {
-   ackTasks.put(ee.getAttemptId(), ev);
-   } else {
-   LOG.info("Checkpoint 

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-14 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r34152
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -708,22 +689,33 @@ public void receiveDeclineMessage(DeclineCheckpoint 
message, String taskManagerL
return;
}
 
-   checkpoint = pendingCheckpoints.remove(checkpointId);
+   checkpoint = pendingCheckpoints.get(checkpointId);
 
-   if (checkpoint != null && !checkpoint.isDiscarded()) {
+   if (checkpoint != null) {
+   Preconditions.checkState(
+   !checkpoint.isDiscarded(),
+   "Received message for discarded but 
non-removed checkpoint " + checkpointId);
LOG.info("Decline checkpoint {} by task {} of 
job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
job,
taskManagerLocationInfo);
-   discardCheckpoint(checkpoint, 
message.getReason(), message.getTaskExecutionId());
-   }
-   else if (checkpoint != null) {
-   // this should not happen
-   throw new IllegalStateException(
-   "Received message for discarded 
but non-removed checkpoint " + checkpointId);
-   }
-   else if (LOG.isDebugEnabled()) {
+   final CheckpointException checkpointException;
+   if (message.getReason() == null) {
+   checkpointException =
+   new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED);
+   } else if (message.getReason() instanceof 
CheckpointException) {
+   checkpointException = 
(CheckpointException) message.getReason();
+   } else {
+   checkpointException =
+   new CheckpointException(
+   
CheckpointFailureReason.JOB_FAILURE, message.getReason());
+   }
 
 Review comment:
   Ah, I forgot to leave an explanation about this. I didn't squash the 
original commits to make it easy to continue reviewing.
   
   Actually I have extracted the "else if ... else" to a helper method named 
`getCheckpointException`. But it does happen in fixup commit but in the 
original commit "[FLINK-13905][checkpointing] Separate checkpoint triggering 
into several asynchronous stages.". I thought it's enough as a helper method.
   
   And thanks for the reminding to keep per fixup as a separate commit. I'll 
try to separate the last big fixup commit :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-10 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r365171167
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -484,77 +484,158 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
+   // we can't trigger checkpoint directly if 
there is a trigger request being processed
+   // or queued
+   triggerRequestQueue.add(new 
CheckpointTriggerRequest(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise));
+   return onCompletionPromise;
}
}
+   startTriggeringCheckpoint(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise);
+   return onCompletionPromise;
+   }
 
-   // next, check if all tasks that need to acknowledge the 
checkpoint are running.
-   // if not, abort the checkpoint
-   Map ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+   private void startTriggeringCheckpoint(
+   long timestamp,
+   CheckpointProperties props,
+   @Nullable String externalSavepointLocation,
+   boolean isPeriodic,
+   boolean advanceToEndOfTime,
+   CompletableFuture onCompletionPromise) {
 
-   for (ExecutionVertex ev : tasksToWaitFor) {
-   Execution ee = ev.getCurrentExecutionAttempt();
-   if (ee != null) {
-   ackTasks.put(ee.getAttemptId(), ev);
-   } else {
-   LOG.info("Checkpoint 

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-08 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r364580406
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -969,25 +969,25 @@ private void dropSubsumedCheckpoints(long checkpointId) {
}
 
/**
-* Triggers the queued request, if there is one.
+* Resumes suspended periodic triggering.
 *
 * NOTE: The caller of this method must hold the lock when invoking 
the method!
 */
-   private void triggerQueuedRequests() {
-   if (triggerRequestQueued) {
-   triggerRequestQueued = false;
+   private void resumePeriodicTriggering() {
+   assert(Thread.holdsLock(lock));
+
+   if (shutdown || !periodicScheduling) {
+   return;
+   }
+   if (periodicTriggeringSuspended) {
+   periodicTriggeringSuspended = false;
 
// trigger the checkpoint from the trigger timer, to 
finish the work of this thread before
// starting with the next checkpoint
-   if (periodicScheduling) {
-   if (currentPeriodicTrigger != null) {
-   currentPeriodicTrigger.cancel(false);
-   }
-   currentPeriodicTrigger = 
scheduleTriggerWithDelay(0L);
-   }
-   else {
-   timer.execute(new ScheduledTrigger());
 
 Review comment:
   I guess your concern is that some necessary trigger might be missing?
   Stopping with savepoint would cancel all in-flight checkpoints/savepoints. 
And it does not allow any further periodic triggering. The `ScheduledTrigger` 
is just a normal one-shot checkpoint. I think it's fine to remove it. I didn't 
find any reason to keep it. I believe the reason it stays here is the unclear 
semantics of `triggerQueuedRequests`. There is a trigger request "queued", we 
should "dequeue" something afterwards.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-08 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r364575545
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -957,17 +953,19 @@ private void rememberRecentCheckpointId(long id) {
}
 
private void dropSubsumedCheckpoints(long checkpointId) {
-   Iterator> entries = 
pendingCheckpoints.entrySet().iterator();
-
-   while (entries.hasNext()) {
-   PendingCheckpoint p = entries.next().getValue();
-   // remove all pending checkpoints that are lesser than 
the current completed checkpoint
-   if (p.getCheckpointId() < checkpointId && 
p.canBeSubsumed()) {
-   rememberRecentCheckpointId(p.getCheckpointId());
-   failPendingCheckpoint(p, 
CheckpointFailureReason.CHECKPOINT_SUBSUMED);
-   entries.remove();
-   }
-   }
+   PendingCheckpoint[] checkpointsToSubsume =
+   pendingCheckpoints
+   .values()
+   .stream()
+   .filter(
+   pendingCheckpoint ->
+   
pendingCheckpoint.getCheckpointId() < checkpointId &&
+   
pendingCheckpoint.canBeSubsumed())
+   .toArray(PendingCheckpoint[]::new);
+
+   abortPendingCheckpoints(
+   checkpointsToSubsume,
+   new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED));
 
 Review comment:
   I think I got your point now :)
   Makes sense to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-08 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r364574694
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -484,77 +484,158 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
+   // we can't trigger checkpoint directly if 
there is a trigger request being processed
+   // or queued
+   triggerRequestQueue.add(new 
CheckpointTriggerRequest(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise));
+   return onCompletionPromise;
}
}
+   startTriggeringCheckpoint(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise);
+   return onCompletionPromise;
+   }
 
-   // next, check if all tasks that need to acknowledge the 
checkpoint are running.
-   // if not, abort the checkpoint
-   Map ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+   private void startTriggeringCheckpoint(
+   long timestamp,
+   CheckpointProperties props,
+   @Nullable String externalSavepointLocation,
+   boolean isPeriodic,
+   boolean advanceToEndOfTime,
+   CompletableFuture onCompletionPromise) {
 
-   for (ExecutionVertex ev : tasksToWaitFor) {
-   Execution ee = ev.getCurrentExecutionAttempt();
-   if (ee != null) {
-   ackTasks.put(ee.getAttemptId(), ev);
-   } else {
-   LOG.info("Checkpoint 

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-08 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r364569676
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -484,77 +484,158 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
+   // we can't trigger checkpoint directly if 
there is a trigger request being processed
+   // or queued
+   triggerRequestQueue.add(new 
CheckpointTriggerRequest(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise));
+   return onCompletionPromise;
}
}
+   startTriggeringCheckpoint(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise);
+   return onCompletionPromise;
+   }
 
-   // next, check if all tasks that need to acknowledge the 
checkpoint are running.
-   // if not, abort the checkpoint
-   Map ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+   private void startTriggeringCheckpoint(
+   long timestamp,
+   CheckpointProperties props,
+   @Nullable String externalSavepointLocation,
+   boolean isPeriodic,
+   boolean advanceToEndOfTime,
+   CompletableFuture onCompletionPromise) {
 
-   for (ExecutionVertex ev : tasksToWaitFor) {
-   Execution ee = ev.getCurrentExecutionAttempt();
-   if (ee != null) {
-   ackTasks.put(ee.getAttemptId(), ev);
-   } else {
-   LOG.info("Checkpoint 

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-08 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r364569315
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -575,86 +657,182 @@ public boolean isShutdown() {
checkpoint.setStatsCallback(callback);
}
 
-   // schedule the timer that will clean up the expired checkpoints
-   final Runnable canceller = () -> {
-   synchronized (lock) {
-   // only do the work if the checkpoint is not 
discarded anyways
-   // note that checkpoint completion discards the 
pending checkpoint object
-   if (!checkpoint.isDiscarded()) {
-   LOG.info("Checkpoint {} of job {} 
expired before completing.", checkpointID, job);
+   synchronized (lock) {
 
-   abortPendingCheckpoint(
-   checkpoint,
-   new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
-   }
-   }
-   };
+   pendingCheckpoints.put(checkpointID, checkpoint);
 
-   try {
-   // re-acquire the coordinator-wide lock
-   synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
+   ScheduledFuture cancellerHandle = timer.schedule(
+   new CheckpointCanceller(checkpoint),
+   checkpointTimeout, TimeUnit.MILLISECONDS);
 
-   LOG.info("Triggering checkpoint {} @ {} for job 
{}.", checkpointID, timestamp, job);
+   if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+   // checkpoint is already disposed!
+   cancellerHandle.cancel(false);
+   }
+   }
+   return checkpoint;
+   }
 
-   pendingCheckpoints.put(checkpointID, 
checkpoint);
+   /**
+* Snapshot master hook states asynchronously.
+*
+* @param checkpoint the pending checkpoint
+* @return the future represents master hook states are finished or not
+*/
+   private CompletableFuture snapshotMasterState(PendingCheckpoint 
checkpoint) {
+   if (masterHooks.isEmpty()) {
+   return CompletableFuture.completedFuture(null);
+   }
 
-   ScheduledFuture cancellerHandle = 
timer.schedule(
-   canceller,
-   checkpointTimeout, 
TimeUnit.MILLISECONDS);
+   final long checkpointID = checkpoint.getCheckpointId();
+   final long timestamp = checkpoint.getCheckpointTimestamp();
 
-   if 
(!checkpoint.setCancellerHandle(cancellerHandle)) {
-   // checkpoint is already disposed!
-   cancellerHandle.cancel(false);
-   }
+   final CompletableFuture masterStateCompletableFuture = 
new CompletableFuture<>();
+   for (MasterTriggerRestoreHook masterHook : 
masterHooks.values()) {
+   MasterHooks
+   .triggerHook(masterHook, checkpointID, 
timestamp, executor)
+   .whenCompleteAsync((masterState, throwable) -> {
+   try {
+   synchronized (lock) {
+   if 
(masterStateCompletableFuture.isDone()) {
+   return;
+   }
+   if 
(checkpoint.isDiscarded()) {
+   throw new 
IllegalStateException(
+   
"Checkpoint " + checkpointID + " has been discarded");
+   }
+   if (throwable == null) {
+   
checkpoint.acknowledgeMasterState(
+   
masterHook.getIdentifier(), masterState);
+

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-08 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r364562946
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -704,12 +882,9 @@ public void receiveDeclineMessage(DeclineCheckpoint 
message, String taskManagerL
if (message.getReason() == null) {
checkpointException =
new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED);
-   } else if (message.getReason() instanceof 
CheckpointException) {
-   checkpointException = 
(CheckpointException) message.getReason();
 
 Review comment:
   I unified this with the "else" below through `getCheckpointException`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-08 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r364550433
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -484,77 +484,158 @@ public boolean isShutdown() {
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) throws CheckpointException {
+   boolean advanceToEndOfTime) {
 
if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
-   throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
+   return FutureUtils.completedExceptionally(new 
IllegalArgumentException(
+   "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
}
 
-   // make some eager pre-checks
+   final CompletableFuture 
onCompletionPromise =
+   new CompletableFuture<>();
synchronized (lock) {
-   preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
-   }
-
-   // check if all tasks that we need to trigger are running.
-   // if not, abort the checkpoint
-   Execution[] executions = new Execution[tasksToTrigger.length];
-   for (int i = 0; i < tasksToTrigger.length; i++) {
-   Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee == null) {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job);
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
-   } else if (ee.getState() == ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
-   LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
-   
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
-   job,
-   ExecutionState.RUNNING,
-   ee.getState());
-   throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+   if (isTriggering || !triggerRequestQueue.isEmpty()) {
+   // we can't trigger checkpoint directly if 
there is a trigger request being processed
+   // or queued
+   triggerRequestQueue.add(new 
CheckpointTriggerRequest(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise));
+   return onCompletionPromise;
}
}
+   startTriggeringCheckpoint(
+   timestamp,
+   props,
+   externalSavepointLocation,
+   isPeriodic,
+   advanceToEndOfTime,
+   onCompletionPromise);
+   return onCompletionPromise;
+   }
 
-   // next, check if all tasks that need to acknowledge the 
checkpoint are running.
-   // if not, abort the checkpoint
-   Map ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+   private void startTriggeringCheckpoint(
+   long timestamp,
+   CheckpointProperties props,
+   @Nullable String externalSavepointLocation,
+   boolean isPeriodic,
+   boolean advanceToEndOfTime,
+   CompletableFuture onCompletionPromise) {
 
-   for (ExecutionVertex ev : tasksToWaitFor) {
-   Execution ee = ev.getCurrentExecutionAttempt();
-   if (ee != null) {
-   ackTasks.put(ee.getAttemptId(), ev);
-   } else {
-   LOG.info("Checkpoint 

[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-08 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r364537382
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -162,9 +162,9 @@
 * Non-volatile, because only accessed in synchronized scope */
private boolean periodicScheduling;
 
-   /** Flag whether a trigger request could not be handled immediately. 
Non-volatile, because only
-* accessed in synchronized scope */
-   private boolean triggerRequestQueued;
+   /** Flag whether periodic triggering is suspended (too many concurrent 
pending checkpoint).
+* Non-volatile, because only accessed in synchronized scope */
+   private boolean periodicTriggeringSuspended;
 
 Review comment:
   Good suggestion. Will do that later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-06 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363407465
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -957,17 +953,19 @@ private void rememberRecentCheckpointId(long id) {
}
 
private void dropSubsumedCheckpoints(long checkpointId) {
-   Iterator> entries = 
pendingCheckpoints.entrySet().iterator();
-
-   while (entries.hasNext()) {
-   PendingCheckpoint p = entries.next().getValue();
-   // remove all pending checkpoints that are lesser than 
the current completed checkpoint
-   if (p.getCheckpointId() < checkpointId && 
p.canBeSubsumed()) {
-   rememberRecentCheckpointId(p.getCheckpointId());
-   failPendingCheckpoint(p, 
CheckpointFailureReason.CHECKPOINT_SUBSUMED);
-   entries.remove();
-   }
-   }
+   PendingCheckpoint[] checkpointsToSubsume =
+   pendingCheckpoints
+   .values()
+   .stream()
+   .filter(
+   pendingCheckpoint ->
+   
pendingCheckpoint.getCheckpointId() < checkpointId &&
+   
pendingCheckpoint.canBeSubsumed())
+   .toArray(PendingCheckpoint[]::new);
+
+   abortPendingCheckpoints(
+   checkpointsToSubsume,
+   new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED));
 
 Review comment:
   Do you mean providing a `abortPendingCheckpoint(Predicate 
checkpointToDeletePredicate)`,  instead of calling 
`pendingCheckpoints.remove(pendingCheckpoint.getCheckpointId())` directly in 
`abortPendingCheckpoint`?
   
   I guess it works. However I'm not sure we should do that or not. The codes 
might be hard to understand. It seems that we have to make a decision about how 
to trade off this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-06 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363393198
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -162,9 +162,9 @@
 * Non-volatile, because only accessed in synchronized scope */
private boolean periodicScheduling;
 
-   /** Flag whether a trigger request could not be handled immediately. 
Non-volatile, because only
-* accessed in synchronized scope */
-   private boolean triggerRequestQueued;
+   /** Flag whether periodic triggering is suspended (too many concurrent 
pending checkpoint).
+* Non-volatile, because only accessed in synchronized scope */
+   private boolean periodicTriggeringSuspended;
 
 Review comment:
   I have explained a bit more in the response inline. It's not just a pure 
renaming. To be honest,  after so many changes the codes are not easy to 
maintain. So I did a bit refactoring when I believe it's necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-06 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363390451
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1441,13 +1441,7 @@ private void preCheckBeforeTriggeringCheckpoint(boolean 
isPeriodic, boolean forc
}
 
if (!forceCheckpoint) {
-   if (triggerRequestQueued) {
-   LOG.warn("Trying to trigger another checkpoint 
for job {} while one was queued already.", job);
-   throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
-   }
-
 
 Review comment:
   It's possible. But since I have refactored the `triggerRequestQueued` to 
`periodicTriggeringSuspended`, it makes no more sense to check 
`periodicTriggeringSuspended`. If the trigger could satisfy other requirements, 
it could be executed. 
   
   Actually in the current implementation. It's also about the non-periodic 
checkpoint triggering.
   1. Savepoint does not care this checking, because  the `forceCheckpoint` is 
set to true.
   2. Periodic checkpoint trigger would not reach here because it would be 
canceled if `triggerRequestQueued` is set to true. (There should be no 
in-flight periodic trigger since it's single-threaded.)
   So it's also dead code...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-06 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363390451
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1441,13 +1441,7 @@ private void preCheckBeforeTriggeringCheckpoint(boolean 
isPeriodic, boolean forc
}
 
if (!forceCheckpoint) {
-   if (triggerRequestQueued) {
-   LOG.warn("Trying to trigger another checkpoint 
for job {} while one was queued already.", job);
-   throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
-   }
-
 
 Review comment:
   It's possible. But since I have refactored the `triggerRequestQueued` to 
`periodicTriggeringSuspended`, it makes no more sense to check 
`periodicTriggeringSuspended`. If the trigger could satisfy other requirements, 
it could be executed. 
   
   Actually in the current implementation. It's also about the non-periodic 
checkpoint triggering.
   1. Savepoint does not care this checking, because  the `forceCheckpoint` is 
set to true.
   2. Periodic checkpoint trigger would not reach here because it would be 
canceled if `triggerRequestQueued` is set to true. (There should be no 
in-flight periodic trigger since it's single-threaded.)
   
   So it's also dead code...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-06 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363367102
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -969,25 +969,25 @@ private void dropSubsumedCheckpoints(long checkpointId) {
}
 
/**
-* Triggers the queued request, if there is one.
+* Resumes suspended periodic triggering.
 *
 * NOTE: The caller of this method must hold the lock when invoking 
the method!
 */
-   private void triggerQueuedRequests() {
-   if (triggerRequestQueued) {
-   triggerRequestQueued = false;
+   private void resumePeriodicTriggering() {
+   assert(Thread.holdsLock(lock));
+
+   if (shutdown || !periodicScheduling) {
+   return;
+   }
+   if (periodicTriggeringSuspended) {
+   periodicTriggeringSuspended = false;
 
// trigger the checkpoint from the trigger timer, to 
finish the work of this thread before
// starting with the next checkpoint
-   if (periodicScheduling) {
-   if (currentPeriodicTrigger != null) {
-   currentPeriodicTrigger.cancel(false);
-   }
-   currentPeriodicTrigger = 
scheduleTriggerWithDelay(0L);
-   }
-   else {
-   timer.execute(new ScheduledTrigger());
 
 Review comment:
   Actually there is a difference between `triggerQueuedRequests` and 
`resumePeriodicTriggering`. It's about the non-periodic checkpoint triggering 
(not savepoint). However there is no such a scenario (non-periodic checkpoint) 
in production codes. It's dead code somewhat.
   
   This part is about the non-periodic checkpoint. In the 
`triggerQueuedRequests` version, it would re-schedule a one-shot checkpoint 
trigger if `periodicScheduling` is false. I guess the reason is that it assume 
the `triggerRequestQueued` is triggered by a non-periodic checkpoint trigger. 
So it recovers a non-periodic trigger. While in the `resumePeriodicTriggering` 
version, it would do nothing if periodic triggering is disabled.
   
   Let's assume there is a non-periodic trigger scenario. Re-scheduling an 
one-shot trigger also makes no sense to me. I don't think this behavior worth 
being kept. If caller wants an one-shot checkpoint being executed anyway, the 
`forceCheckpoint` should be set. Otherwise this one-shot checkpoint might fail 
and will never be re-scheduled. Caller could track the lifecycle thought 
`CompletableFuture` returned by `triggerCheckpoint`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-06 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363191715
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -708,22 +689,33 @@ public void receiveDeclineMessage(DeclineCheckpoint 
message, String taskManagerL
return;
}
 
-   checkpoint = pendingCheckpoints.remove(checkpointId);
+   checkpoint = pendingCheckpoints.get(checkpointId);
 
-   if (checkpoint != null && !checkpoint.isDiscarded()) {
+   if (checkpoint != null) {
+   Preconditions.checkState(
+   !checkpoint.isDiscarded(),
+   "Received message for discarded but 
non-removed checkpoint " + checkpointId);
LOG.info("Decline checkpoint {} by task {} of 
job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
job,
taskManagerLocationInfo);
-   discardCheckpoint(checkpoint, 
message.getReason(), message.getTaskExecutionId());
-   }
-   else if (checkpoint != null) {
-   // this should not happen
-   throw new IllegalStateException(
-   "Received message for discarded 
but non-removed checkpoint " + checkpointId);
-   }
-   else if (LOG.isDebugEnabled()) {
+   final CheckpointException checkpointException;
+   if (message.getReason() == null) {
+   checkpointException =
+   new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED);
+   } else if (message.getReason() instanceof 
CheckpointException) {
+   checkpointException = 
(CheckpointException) message.getReason();
+   } else {
+   checkpointException =
+   new CheckpointException(
+   
CheckpointFailureReason.JOB_FAILURE, message.getReason());
+   }
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-06 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363191628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -363,10 +361,9 @@ public void shutdown(JobStatus jobStatus) throws 
Exception {
masterHooks.clear();
 
// clear and discard all pending checkpoints
-   for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
-   failPendingCheckpoint(pending, 
CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
-   }
-   pendingCheckpoints.clear();
+   abortPendingCheckpoints(
+   new CheckpointException(
+   
CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN));
 
 Review comment:
   Yes, but `checkAndResetCheckpointScheduler` is called before. They are both 
trying to resume periodic triggering.
   Anyway, `triggerQueuedRequests` will not work here even it's called. Because 
it will check `shutdown` and `periodicScheduling` first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363179854
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1352,52 +1359,6 @@ public void run() {
}
}
 
-   /**
-* Discards the given pending checkpoint because of the given cause.
-*
-* @param pendingCheckpoint to discard
-* @param cause for discarding the checkpoint
-* @param executionAttemptID the execution attempt id of the failing 
task.
-*/
-   private void discardCheckpoint(
-   PendingCheckpoint pendingCheckpoint,
-   @Nullable Throwable cause,
-   ExecutionAttemptID executionAttemptID) {
-   assert(Thread.holdsLock(lock));
-   Preconditions.checkNotNull(pendingCheckpoint);
-
-   final long checkpointId = pendingCheckpoint.getCheckpointId();
-
-   LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
-
-   if (cause == null) {
-   
failPendingCheckpointDueToTaskFailure(pendingCheckpoint, 
CheckpointFailureReason.CHECKPOINT_DECLINED, executionAttemptID);
-   } else if (cause instanceof CheckpointException) {
-   CheckpointException exception = (CheckpointException) 
cause;
-   
failPendingCheckpointDueToTaskFailure(pendingCheckpoint, 
exception.getCheckpointFailureReason(), cause, executionAttemptID);
-   } else {
-   
failPendingCheckpointDueToTaskFailure(pendingCheckpoint, 
CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID);
-   }
-
-   rememberRecentCheckpointId(checkpointId);
-
-   // we don't have to schedule another "dissolving" checkpoint 
any more because the
-   // cancellation barriers take care of breaking downstream 
alignments
-   // we only need to make sure that suspended queued requests are 
resumed
-
-   boolean haveMoreRecentPending = false;
-   for (PendingCheckpoint p : pendingCheckpoints.values()) {
-   if (!p.isDiscarded() && p.getCheckpointId() >= 
pendingCheckpoint.getCheckpointId()) {
-   haveMoreRecentPending = true;
-   break;
-   }
-   }
-
-   if (!haveMoreRecentPending) {
-   triggerQueuedRequests();
-   }
 
 Review comment:
   Please see the response below. I changed the behavior a bit of resuming 
periodic triggering.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363179538
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1429,61 +1390,45 @@ public void run() {
}
}
 
-   private void failPendingCheckpoint(
-   final PendingCheckpoint pendingCheckpoint,
-   final CheckpointFailureReason reason) {
-
-   failPendingCheckpoint(pendingCheckpoint, reason, null);
-   }
-
-   private void failPendingCheckpoint(
-   final PendingCheckpoint pendingCheckpoint,
-   final CheckpointFailureReason reason,
-   @Nullable final Throwable cause) {
-
-   CheckpointException exception = new CheckpointException(reason, 
cause);
-   pendingCheckpoint.abort(reason, cause);
-   failureManager.handleJobLevelCheckpointException(exception, 
pendingCheckpoint.getCheckpointId());
-
-   checkAndResetCheckpointScheduler();
-   }
-
-   private void failPendingCheckpointDueToTaskFailure(
-   final PendingCheckpoint pendingCheckpoint,
-   final CheckpointFailureReason reason,
-   final ExecutionAttemptID executionAttemptID) {
+   private void abortPendingCheckpoint(
+   PendingCheckpoint pendingCheckpoint,
+   CheckpointException exception) {
 
-   failPendingCheckpointDueToTaskFailure(pendingCheckpoint, 
reason, null, executionAttemptID);
+   abortPendingCheckpoint(pendingCheckpoint, exception, null);
}
 
-   private void failPendingCheckpointDueToTaskFailure(
-   final PendingCheckpoint pendingCheckpoint,
-   final CheckpointFailureReason reason,
-   @Nullable final Throwable cause,
-   final ExecutionAttemptID executionAttemptID) {
-
-   CheckpointException exception = new CheckpointException(reason, 
cause);
-   pendingCheckpoint.abort(reason, cause);
-   failureManager.handleTaskLevelCheckpointException(exception, 
pendingCheckpoint.getCheckpointId(), executionAttemptID);
+   private void abortPendingCheckpoint(
+   PendingCheckpoint pendingCheckpoint,
+   CheckpointException exception,
+   @Nullable final ExecutionAttemptID executionAttemptID) {
 
-   checkAndResetCheckpointScheduler();
-   }
+   assert(Thread.holdsLock(lock));
 
-   private void checkAndResetCheckpointScheduler() {
-   if (!shutdown && periodicScheduling && currentPeriodicTrigger 
== null) {
-   synchronized (lock) {
-   if (pendingCheckpoints.isEmpty() || 
allPendingCheckpointsDiscarded()) {
-   triggerRequestQueued = false;
-   currentPeriodicTrigger = 
scheduleTriggerWithDelay(getRandomInitDelay());
 
 Review comment:
   Basically `checkAndResetCheckpointScheduler` does same thing with 
`triggerQueuedRequests` (try to resume the periodic triggering) but under a 
stricter condition (there is no more in-flight checkpoint). 
   `checkAndResetCheckpointScheduler` is invoked when an in-flight checkpoint 
fails. `triggerRequestQueued` is invoked when an in-flight checkpoint succeeds 
or timeouts.
   
   It does not makes much sense but it makes the codes more complicated. I 
think the rule here could be simplified a lot.
   
   It suspends the periodic triggering when there are too many in-flight 
checkpoints. So it should resume periodic triggering when there are fewer 
in-flight checkpoints than `maxConcurrentCheckpointAttempts`. That means trying 
to resume periodic triggering when an in-flight checkpoint ends its lifecycle, 
no matter succeeding or failing. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363171920
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -648,24 +636,17 @@ public boolean isShutdown() {
return checkpoint.getCompletionFuture();
}
catch (Throwable t) {
-   // guard the map against concurrent modifications
-   synchronized (lock) {
-   pendingCheckpoints.remove(checkpointID);
-   }
-
int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint {} for job {}. 
({} consecutive failed attempts so far)",
checkpointID, job, numUnsuccessful, t);
 
-   if (!checkpoint.isDiscarded()) {
-   failPendingCheckpoint(checkpoint, 
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
-   }
-
-   try {
-   checkpointStorageLocation.disposeOnFailure();
 
 Review comment:
   It's duplicated. `abortPendingCheckpoint` -> `PendingCheckpoint#dispose` 
will invoke `CheckpointStorageLocation#disposeOnFailure`. That's the benefit of 
unifying error handling into `abortPendingCheckpoint`. We don't have to release 
resource everywhere.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363102017
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1262,11 +1261,19 @@ public void stopCheckpointScheduler() {
 */
public void abortPendingCheckpoints(CheckpointException exception) {
synchronized (lock) {
-   for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-   failPendingCheckpoint(p, 
exception.getCheckpointFailureReason());
-   }
+   // do not traverse pendingCheckpoints directly, because 
it might be changed
 
 Review comment:
   No, it's a reminder. It's introduced by this PR.
   After the refactoring of error handling, the `abortPendingCheckpoint` is the 
only one entrance to fail a pending checkpoint. The `abortPendingCheckpoint` 
maintains the `pendingCheckpoints`. We don't need to maintain 
`pendingCheckpoints` everywhere. However as a side-effect, we can't invoke 
`abortPendingCheckpoint` in a for loop like before. A 
`ConcurrentModificationException` will be raised in that way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363102017
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1262,11 +1261,19 @@ public void stopCheckpointScheduler() {
 */
public void abortPendingCheckpoints(CheckpointException exception) {
synchronized (lock) {
-   for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-   failPendingCheckpoint(p, 
exception.getCheckpointFailureReason());
-   }
+   // do not traverse pendingCheckpoints directly, because 
it might be changed
 
 Review comment:
   No, it's a reminder. It's introduced by this PR.
   After the refactoring of error handling, the `failPendingCheckpoint` is the 
only one entrance to fail a pending checkpoint. The `failPendingCheckpoint` 
maintains the `pendingCheckpoints`. We don't need to maintain 
`pendingCheckpoints` everywhere. However as a side-effect, we can't invoke 
`failPendingCheckpoint` in a for loop like before. A 
`ConcurrentModificationException` will be raised in that way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363102441
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1262,11 +1261,19 @@ public void stopCheckpointScheduler() {
 */
public void abortPendingCheckpoints(CheckpointException exception) {
synchronized (lock) {
-   for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-   failPendingCheckpoint(p, 
exception.getCheckpointFailureReason());
-   }
+   // do not traverse pendingCheckpoints directly, because 
it might be changed
+   abortPendingCheckpoints(
+   pendingCheckpoints.values().toArray(new 
PendingCheckpoint[0]), exception);
+   }
+   }
+
+   private void abortPendingCheckpoints(
+   PendingCheckpoint[] checkpoints, CheckpointException exception) 
{
 
 Review comment:
   Alright, I need to re-check the code style guide :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363102243
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -957,17 +953,19 @@ private void rememberRecentCheckpointId(long id) {
}
 
private void dropSubsumedCheckpoints(long checkpointId) {
-   Iterator> entries = 
pendingCheckpoints.entrySet().iterator();
-
-   while (entries.hasNext()) {
-   PendingCheckpoint p = entries.next().getValue();
-   // remove all pending checkpoints that are lesser than 
the current completed checkpoint
-   if (p.getCheckpointId() < checkpointId && 
p.canBeSubsumed()) {
-   rememberRecentCheckpointId(p.getCheckpointId());
-   failPendingCheckpoint(p, 
CheckpointFailureReason.CHECKPOINT_SUBSUMED);
-   entries.remove();
-   }
-   }
+   PendingCheckpoint[] checkpointsToSubsume =
+   pendingCheckpoints
+   .values()
+   .stream()
+   .filter(
+   pendingCheckpoint ->
+   
pendingCheckpoint.getCheckpointId() < checkpointId &&
+   
pendingCheckpoint.canBeSubsumed())
+   .toArray(PendingCheckpoint[]::new);
+
+   abortPendingCheckpoints(
+   checkpointsToSubsume,
+   new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED));
 
 Review comment:
   The purpose of passing an array to `abortPendingCheckpoints` as parameter is 
to avoid potential `ConcurrentModificationException`. There are more details in 
the response above. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363102017
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1262,11 +1261,19 @@ public void stopCheckpointScheduler() {
 */
public void abortPendingCheckpoints(CheckpointException exception) {
synchronized (lock) {
-   for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-   failPendingCheckpoint(p, 
exception.getCheckpointFailureReason());
-   }
+   // do not traverse pendingCheckpoints directly, because 
it might be changed
 
 Review comment:
   No, it's a reminder. It's introduced by this PR.
   After the refactoring of error handling, the `failPendingCheckpoint` is the 
only one entrance to fail a pending checkpoint. The `failPendingCheckpoint` 
maintains the `pendingCheckpoints`. We don't need to maintain 
`pendingCheckpoints` everywhere. However as side-effect, we can't invoke 
`failPendingCheckpoint` in a for loop like before. A 
`ConcurrentModificationException` will be raised in that way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages

2020-01-05 Thread GitBox
ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363101375
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -162,9 +162,9 @@
 * Non-volatile, because only accessed in synchronized scope */
private boolean periodicScheduling;
 
-   /** Flag whether a trigger request could not be handled immediately. 
Non-volatile, because only
-* accessed in synchronized scope */
-   private boolean triggerRequestQueued;
+   /** Flag whether periodic triggering is suspended (too many concurrent 
pending checkpoint).
+* Non-volatile, because only accessed in synchronized scope */
+   private boolean periodicTriggeringSuspended;
 
 Review comment:
   OK let me explain more about the background.
   1. I want to introduce a queue to cache those trigger requests that can't be 
triggered for now. Because there is an asynchronous in-flight trigger. This is 
described in last commit.
   2. I found there is already a queue logic existing, `triggerRequestQueued`.
   3. I tried to unify these two queuing concepts. But I found it makes things 
much more complicated. It's hard to understand or maintain the queuing logic 
without this background. I believe it's more reasonable to keep both these 
queuing concepts separately.
   4. I renamed the `triggerRequestQueued` to `periodicTriggeringSuspended` to 
distinguish these two concepts. I think the `triggerRequestQueued` relevant 
concept is not correct somewhat. It does not really queue the trigger request 
but fail it and suspend the periodic triggering instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services