[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r339081895
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -593,6 +599,10 @@ public void failJobDueToTaskFailure(Throwable cause, 
ExecutionAttemptID failingT
}
);
 
+   checkpointCoordinatorTimer = 
Executors.newSingleThreadScheduledExecutor(
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r339078379
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -316,6 +319,9 @@
/** The coordinator for checkpoints, if snapshot checkpoints are 
enabled. */
private CheckpointCoordinator checkpointCoordinator;
 
+   /** TODO, replace it with main thread executor. */
+   private ScheduledExecutorService checkpointCoordinatorTimer;
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r339075696
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 ##
 @@ -55,7 +54,7 @@
 
private BiConsumer> 
releasePartitionsConsumer = (ignore1, ignore2) -> { };
 
-   private Consumer> checkpointConsumer = ignore -> { };
+   private CheckpointConsumer checkpointConsumer = null;
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r339072220
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -535,149 +528,142 @@ public boolean isShutdown() {
 
// we will actually trigger this checkpoint!
 
-   // we lock with a special lock to make sure that trigger 
requests do not overtake each other.
-   // this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
-   // may issue blocking operations. Using a different lock than 
the coordinator-wide lock,
-   // we avoid blocking the processing of 'acknowledge/decline' 
messages during that time.
-   synchronized (triggerLock) {
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r339071818
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 ##
 @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception 
{
cc.addMasterHook(statefulHook2);
 
// trigger a checkpoint
-   assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), 
false));
+   assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), 
false).isCompletedExceptionally());
 
 Review comment:
   I almost missed this message.
   Yes, I agree, testing case through mockito is not easy to maintain. It could 
be a follow-up issue (there are already some in my list). I could come back to 
this after the main part merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-24 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338881552
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1466,11 +1477,24 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
final JobID jid = new JobID();
final long timestamp = System.currentTimeMillis();
 
+   final CountDownLatch checkpointTriggeredLatch1 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch2 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch3 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch4 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch5 = new 
CountDownLatch(2);
+   final CoundDownLatchCheckpointConsumers consumers =
+   new CoundDownLatchCheckpointConsumers(
+   new ArrayDeque() {{
+   add(checkpointTriggeredLatch1);
+   add(checkpointTriggeredLatch2);
+   add(checkpointTriggeredLatch3);
+   add(checkpointTriggeredLatch4);
+   add(checkpointTriggeredLatch5); }});
 
 Review comment:
   That's really a nice suggestion!
   I thought there might be some problems with a manually executor for these 
cases. But after did a testing, I realized all cases work well with manually 
executor. Moreover in this way, some time based case could be simplified a lot.
   BTW, I keep some "dead code" for now, for example, 
`TestingScheduledExecutor`, and the changes of 
`SimpleAckingTaskManagerGateway`. It might be useful for the later PR or others.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-24 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338879934
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -153,8 +155,8 @@
/** A handle to the current periodic trigger, to cancel it when 
necessary. */
private ScheduledFuture currentPeriodicTrigger;
 
-   /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed. */
-   private long lastCheckpointCompletionNanos;
+   /** The relative timestamp when the next checkpoint could be triggered. 
*/
+   private long earliestRelativeTimeNextCheckpointBeTriggered;
 
 Review comment:
   I think the problem is this field includes too much meaning. So it's not 
easy to give it a short and clear name. I reverted this field back to 
`lastCheckpointCompletionRelativeTime`. Calculates the 
`nextCheckpointTriggerRelativeTime` in `checkMinPauseBetweenCheckpoints`, it's 
much easier to understand with context. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-24 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338410227
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 ##
 @@ -54,6 +55,8 @@
 
private BiConsumer> 
releasePartitionsConsumer = (ignore1, ignore2) -> { };
 
+   private Consumer> checkpointConsumer = ignore -> { };
 
 Review comment:
   Good suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-17 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335790644
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -535,149 +528,142 @@ public boolean isShutdown() {
 
// we will actually trigger this checkpoint!
 
-   // we lock with a special lock to make sure that trigger 
requests do not overtake each other.
-   // this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
-   // may issue blocking operations. Using a different lock than 
the coordinator-wide lock,
-   // we avoid blocking the processing of 'acknowledge/decline' 
messages during that time.
-   synchronized (triggerLock) {
 
 Review comment:
   The `triggerSavepointInternal` will not invoke `triggerCheckpoint` directly 
anymore. In the prior commit of this one, I have changed the behavior of 
`triggerSavepointInternal`, it executes the trigger message in timer thread. So 
all trigger operations (checkpoint or savepoint) are executed in timer thread 
(will be replaced by main thread eventually). There will be no competition 
under this `triggerLock`. I will indicate it in the comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335801615
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1306,10 +1312,8 @@ private long getRandomInitDelay() {
return 
ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 
1L);
}
 
-   private ScheduledFuture scheduleTriggerWithDelay(long initDelay) {
-   return timer.scheduleAtFixedRate(
-   new ScheduledTrigger(),
-   initDelay, baseInterval, TimeUnit.MILLISECONDS);
+   private ScheduledFuture scheduleTriggerWithDelay(long delay) {
+   return timer.schedule(new ScheduledTrigger(), delay, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   That's a good question. The direct reason is that currently the main thread 
executor does not support periodic scheduling, it's not implemented yet. So 
there are two options.
   1. Support periodic scheduling in main thread executor.
   2. Manually triggering checkpoint in `CheckpointCoordinator`.
   
   There is a short discussion of the option 1, see 
https://issues.apache.org/jira/browse/FLINK-13848.
   
   I choose option 2 currently. Because I'm a not fan of the way of periodic 
scheduling currently. There are a lot of cancellation of periodic scheduling. 
Cancellation is not accurate and reliable somewhat.
   
   But to be honest, there is a short-coming of option 2. We have to make sure 
the next trigger will be scheduled under all scenarios, successful, failed, 
request queued (it's messy of handling queued request now). It's a bit 
complicated than I expected.
   
   Do you have any suggestion of this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335793012
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -289,29 +292,29 @@ public void testStopPeriodicScheduler() throws Exception 
{
failureManager);
 
// Periodic
+   CompletableFuture periodicTriggerResult = 
coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
+   periodicTriggerResult.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   assertTrue(e.getCause() instanceof CheckpointException);
 
 Review comment:
   Good point!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335793060
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -289,29 +292,29 @@ public void testStopPeriodicScheduler() throws Exception 
{
failureManager);
 
// Periodic
+   CompletableFuture periodicTriggerResult = 
coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
+   periodicTriggerResult.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   assertTrue(e.getCause() instanceof CheckpointException);
+   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+   ((CheckpointException) 
e.getCause()).getCheckpointFailureReason());
}
 
// Not periodic
-   try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   false,
-   false);
-   } catch (CheckpointException e) {
-   fail("Unexpected exception : " + 
e.getCheckpointFailureReason().message());
-   }
+   CompletableFuture nonPeriodicTriggerResult 
= coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   false,
+   false);
+   
assertFalse(nonPeriodicTriggerResult.isCompletedExceptionally());
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335792916
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -153,8 +155,8 @@
/** A handle to the current periodic trigger, to cancel it when 
necessary. */
private ScheduledFuture currentPeriodicTrigger;
 
-   /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed. */
-   private long lastCheckpointCompletionNanos;
+   /** The relative timestamp when the next checkpoint could be triggered. 
*/
+   private long earliestRelativeTimeNextCheckpointBeTriggered;
 
 Review comment:
   As mentioned before, "earliest" means it's the earliest time next trigger 
could be triggered.
   "relative" means it's a relative time (`System.nanotime()`).
   Maybe `earliestNextCheckpointTriggerRelativeTime`? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335792379
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -153,8 +155,8 @@
/** A handle to the current periodic trigger, to cancel it when 
necessary. */
private ScheduledFuture currentPeriodicTrigger;
 
-   /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed. */
-   private long lastCheckpointCompletionNanos;
+   /** The relative timestamp when the next checkpoint could be triggered. 
*/
 
 Review comment:
   Actually it's not the time which next checkpoint "should" be triggered. The 
`earliestRelativeTimeNextCheckpointBeTriggered` means next trigger couldn't be 
launched until this time due to the restriction of `minPauseBetweenCheckpoints`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335791667
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -535,149 +528,142 @@ public boolean isShutdown() {
 
 
 Review comment:
   I fully understand...
   The reason I didn't rework this part is the `triggerCheckpoint` will be 
changed in later PRs. I was planning to make the non-functional reworking in 
the very end PRs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335790644
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -535,149 +528,142 @@ public boolean isShutdown() {
 
// we will actually trigger this checkpoint!
 
-   // we lock with a special lock to make sure that trigger 
requests do not overtake each other.
-   // this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
-   // may issue blocking operations. Using a different lock than 
the coordinator-wide lock,
-   // we avoid blocking the processing of 'acknowledge/decline' 
messages during that time.
-   synchronized (triggerLock) {
 
 Review comment:
   The `triggerSavepointInternal` will not invoke `triggerCheckpoint` directly 
anymore. In the prior commit of this one, I have changed the behavior of 
`triggerSavepointInternal`, it executes the trigger message in timer thread. So 
all trigger operations (checkpoint or savepoint) are executed in timer thread 
(will be replaced by main thread eventually). There will be no competition 
under this `triggerLock`. I will indicate it in commit message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335789730
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 ##
 @@ -70,6 +73,10 @@ public void setReleasePartitionsConsumer(BiConsumer> checkpointConsumer) {
 
 Review comment:
   Ditto, it's used to check if the tasks receive `triggerCheckpoint` message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335789416
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1347,6 +1356,7 @@ public void testTriggerAndConfirmSimpleSavepoint() 
throws Exception {
CompletableFuture savepointFuture = 
coord.triggerSavepoint(timestamp, savepointDir);
assertFalse(savepointFuture.isDone());
 
+   checkpointTriggeredLatch1.await();
 
 Review comment:
   As mentioned before, the latch is to check if the triggering is successful. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335789306
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 ##
 @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception 
{
cc.addMasterHook(statefulHook2);
 
// trigger a checkpoint
-   assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), 
false));
+   assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), 
false).isCompletedExceptionally());
 
 Review comment:
   If there is an error occurred immediately, the future will completed with an 
exception.
   If the triggering is successful, we could check whether the tasks receive 
the `triggerCheckpoint` message or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
ifndef-SleePy commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335788255
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 ##
 @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception 
{
cc.addMasterHook(statefulHook2);
 
// trigger a checkpoint
-   assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), 
false));
+   assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), 
false).isCompletedExceptionally());
 
 Review comment:
   Ah, sorry that's my fault. I forgot updating the java doc. I changed the 
behavior of `triggerCheckpoint` a bit. The return value, 
`CompletableFuture`, will not complete until this 
checkpoint is finished. It will not complete immediately after 
`triggerCheckpoint`. Unless there is an error occurred. It's similar to 
`triggerSavepoint`.
   I did this because eventually the `triggerCheckpoint` will be non-blocking. 
It will be separated into several stages, pre-checking, initializing the id and 
checkpointStorageLocation, triggering snapshotting master state, and then 
triggering snapshotting task state. We can't judge the triggering is successful 
or not by a boolean return value as before.
   
   So in this case, we can't wait for the future to complete, because it will 
never complete.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services