[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r339013035
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -316,6 +319,9 @@
/** The coordinator for checkpoints, if snapshot checkpoints are 
enabled. */
private CheckpointCoordinator checkpointCoordinator;
 
+   /** TODO, replace it with main thread executor. */
+   private ScheduledExecutorService checkpointCoordinatorTimer;
 
 Review comment:
   `@Nullable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r339013082
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -593,6 +599,10 @@ public void failJobDueToTaskFailure(Throwable cause, 
ExecutionAttemptID failingT
}
);
 
+   checkpointCoordinatorTimer = 
Executors.newSingleThreadScheduledExecutor(
 
 Review comment:
   `checkState(checkpointCoordinatorTimer == null)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338983887
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 ##
 @@ -55,7 +54,7 @@
 
private BiConsumer> 
releasePartitionsConsumer = (ignore1, ignore2) -> { };
 
-   private Consumer> checkpointConsumer = ignore -> { };
+   private CheckpointConsumer checkpointConsumer = null;
 
 Review comment:
   nit: missing `@Nullable`?
   
   But I think it would be better to leave it as not nullable field, with a 
default (inlined lambda function) implementation ignoring the call? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-25 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338983040
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1466,11 +1477,24 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
final JobID jid = new JobID();
final long timestamp = System.currentTimeMillis();
 
+   final CountDownLatch checkpointTriggeredLatch1 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch2 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch3 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch4 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch5 = new 
CountDownLatch(2);
+   final CoundDownLatchCheckpointConsumers consumers =
+   new CoundDownLatchCheckpointConsumers(
+   new ArrayDeque() {{
+   add(checkpointTriggeredLatch1);
+   add(checkpointTriggeredLatch2);
+   add(checkpointTriggeredLatch3);
+   add(checkpointTriggeredLatch4);
+   add(checkpointTriggeredLatch5); }});
 
 Review comment:
   I'm glad that it worked out :)
   
   > BTW, I keep some "dead code" for now, for example, 
TestingScheduledExecutor, and the changes of SimpleAckingTaskManagerGateway. It 
might be useful for the later PR or others.
   
   I think I would revisit this at the end of this effort.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-23 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338097031
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1466,11 +1477,24 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
final JobID jid = new JobID();
final long timestamp = System.currentTimeMillis();
 
+   final CountDownLatch checkpointTriggeredLatch1 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch2 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch3 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch4 = new 
CountDownLatch(2);
+   final CountDownLatch checkpointTriggeredLatch5 = new 
CountDownLatch(2);
+   final CoundDownLatchCheckpointConsumers consumers =
+   new CoundDownLatchCheckpointConsumers(
+   new ArrayDeque() {{
+   add(checkpointTriggeredLatch1);
+   add(checkpointTriggeredLatch2);
+   add(checkpointTriggeredLatch3);
+   add(checkpointTriggeredLatch4);
+   add(checkpointTriggeredLatch5); }});
 
 Review comment:
   Maybe instead of latching, we could just wait for an executor to complete 
pending work? Generally speaking with single threaded, non blocking, 
asynchronous code it is usually easier to debug and maintain such tests, when 
using the following pattern:
   1. perform some action that enqueues some async action, but do not let the 
action to be executed in the background
   2. manually trigger execution of the enqueued action, preferably in the main 
thread
   3. repeat until there are no more enqueued actions (applies if an action can 
enqueue more actions)
   4. validate the final state
   
   (in that case everything is executed by the main test thread, so it's easy 
to debug)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-23 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338090326
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 ##
 @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception 
{
cc.addMasterHook(statefulHook2);
 
// trigger a checkpoint
-   assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), 
false));
+   assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), 
false).isCompletedExceptionally());
 
 Review comment:
   Ok, I get it. Thanks for the explanation!
   
   I'm not a fan of using mockito like that and especially not a fan of doing 
the checks whether a method `foo(long, long, Executor)` was called and how many 
times (can easily brake and give false positive errors), but rewriting this 
test is probably out of the scope of this already quite big PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-23 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338106044
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -153,8 +155,8 @@
/** A handle to the current periodic trigger, to cancel it when 
necessary. */
private ScheduledFuture currentPeriodicTrigger;
 
-   /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed. */
-   private long lastCheckpointCompletionNanos;
+   /** The relative timestamp when the next checkpoint could be triggered. 
*/
+   private long earliestRelativeTimeNextCheckpointBeTriggered;
 
 Review comment:
   I would shorten it to `nextCheckpointTriggerRelativeTime`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-23 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r338102677
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -535,149 +528,142 @@ public boolean isShutdown() {
 
// we will actually trigger this checkpoint!
 
-   // we lock with a special lock to make sure that trigger 
requests do not overtake each other.
-   // this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
-   // may issue blocking operations. Using a different lock than 
the coordinator-wide lock,
-   // we avoid blocking the processing of 'acknowledge/decline' 
messages during that time.
-   synchronized (triggerLock) {
 
 Review comment:
   Ok, again thanks for the explanation.
   
   Could you copy/paste this explanation to the commit message? (not comment in 
the code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335432387
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -289,29 +292,29 @@ public void testStopPeriodicScheduler() throws Exception 
{
failureManager);
 
// Periodic
+   CompletableFuture periodicTriggerResult = 
coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
+   periodicTriggerResult.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   assertTrue(e.getCause() instanceof CheckpointException);
+   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+   ((CheckpointException) 
e.getCause()).getCheckpointFailureReason());
}
 
// Not periodic
-   try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   false,
-   false);
-   } catch (CheckpointException e) {
-   fail("Unexpected exception : " + 
e.getCheckpointFailureReason().message());
-   }
+   CompletableFuture nonPeriodicTriggerResult 
= coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   false,
+   false);
+   
assertFalse(nonPeriodicTriggerResult.isCompletedExceptionally());
 
 Review comment:
   ditto about waiting for the future to complete? 
   ```
   assertFalse(nonPeriodicTriggerResult.isCompletedExceptionally());
   ```
->
   ```
   nonPeriodicTriggerResult.get();
   ```
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335030476
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1347,6 +1356,7 @@ public void testTriggerAndConfirmSimpleSavepoint() 
throws Exception {
CompletableFuture savepointFuture = 
coord.triggerSavepoint(timestamp, savepointDir);
assertFalse(savepointFuture.isDone());
 
+   checkpointTriggeredLatch1.await();
 
 Review comment:
   Why do we need this latch?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335020372
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 ##
 @@ -201,7 +201,7 @@ public void testHooksAreCalledOnTrigger() throws Exception 
{
cc.addMasterHook(statefulHook2);
 
// trigger a checkpoint
-   assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), 
false));
+   assertFalse(cc.triggerCheckpoint(System.currentTimeMillis(), 
false).isCompletedExceptionally());
 
 Review comment:
   Shouldn't we wait for the future to complete? (and ditto in other places?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335434198
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1306,10 +1312,8 @@ private long getRandomInitDelay() {
return 
ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 
1L);
}
 
-   private ScheduledFuture scheduleTriggerWithDelay(long initDelay) {
-   return timer.scheduleAtFixedRate(
-   new ScheduledTrigger(),
-   initDelay, baseInterval, TimeUnit.MILLISECONDS);
+   private ScheduledFuture scheduleTriggerWithDelay(long delay) {
+   return timer.schedule(new ScheduledTrigger(), delay, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   What is the benefit/purpose of this change and this commit (`Manually 
schedule periodic checkpoint trigger instead of scheduleAtFixedRate`) as a 
whole?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335030916
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1502,13 +1526,16 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
 
// Trigger savepoint and checkpoint
CompletableFuture savepointFuture1 = 
coord.triggerSavepoint(timestamp, savepointDir);
+   checkpointTriggeredLatch1.await();
 
 Review comment:
   again, do we need it with `coord.triggerCheckpoint(timestamp + 1, 
false).get()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335429166
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -153,8 +155,8 @@
/** A handle to the current periodic trigger, to cancel it when 
necessary. */
private ScheduledFuture currentPeriodicTrigger;
 
-   /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed. */
-   private long lastCheckpointCompletionNanos;
+   /** The relative timestamp when the next checkpoint could be triggered. 
*/
+   private long earliestRelativeTimeNextCheckpointBeTriggered;
 
 Review comment:
   nit: 
`%s/earliestRelativeTimeNextCheckpointBeTriggered/nextCheckpointTriggerTime/g`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335426392
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -535,149 +528,142 @@ public boolean isShutdown() {
 
 
 Review comment:
   This method requires to be split into a smaller sub methods s baadddlyyy 
that it actually hurts my brain...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335428138
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -153,8 +155,8 @@
/** A handle to the current periodic trigger, to cancel it when 
necessary. */
private ScheduledFuture currentPeriodicTrigger;
 
-   /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed. */
-   private long lastCheckpointCompletionNanos;
+   /** The relative timestamp when the next checkpoint could be triggered. 
*/
 
 Review comment:
   nit: `could` -> `should`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335022275
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 ##
 @@ -301,7 +310,7 @@ private void 
testRestoreLatestCheckpointIsPreferSavepoint(boolean isPreferCheckp
failureManager);
 
//trigger a checkpoint and wait to become a completed 
checkpoint
-   assertTrue(coord.triggerCheckpoint(timestamp, false));
+   assertFalse(coord.triggerCheckpoint(timestamp, 
false).isCompletedExceptionally());
 
 Review comment:
   If you replaced that with `coord.triggerCheckpoint(timestamp, false).get()`, 
would you still need the `checkpointTriggeredLatch`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335425982
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -535,149 +528,142 @@ public boolean isShutdown() {
 
// we will actually trigger this checkpoint!
 
-   // we lock with a special lock to make sure that trigger 
requests do not overtake each other.
-   // this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
-   // may issue blocking operations. Using a different lock than 
the coordinator-wide lock,
-   // we avoid blocking the processing of 'acknowledge/decline' 
messages during that time.
-   synchronized (triggerLock) {
 
 Review comment:
   Can we get rid of the `triggerLock` here? With the code as it is, 
`CheckpointCoordinator#triggerCheckpoint(long, CheckpointProperties, 
java.lang.String, boolean, boolean)`  method can be called from the `timer` 
thread (it's single threaded now) and also from the main thread via 
`triggerSavepointInternal`, right?
   
   Shouldn't this change happen after we get rid of the timer thread altogether?
   
   Also this commit deserves a better/more detailed explanation in the commit 
message. Could you write/explain there why can we drop the lock?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335431898
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##
 @@ -289,29 +292,29 @@ public void testStopPeriodicScheduler() throws Exception 
{
failureManager);
 
// Periodic
+   CompletableFuture periodicTriggerResult = 
coord.triggerCheckpoint(
+   System.currentTimeMillis(),
+   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+   null,
+   true,
+   false);
try {
-   coord.triggerCheckpoint(
-   System.currentTimeMillis(),
-   
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-   null,
-   true,
-   false);
+   periodicTriggerResult.get();
fail("The triggerCheckpoint call expected an 
exception");
-   } catch (CheckpointException e) {
-   
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+   } catch (ExecutionException e) {
+   assertTrue(e.getCause() instanceof CheckpointException);
 
 Review comment:
   `ExceptionUtils#findThrowable(java.lang.Throwable, java.lang.Class)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9853: [FLINK-13904][checkpointing] Avoid competition of checkpoint triggering

2019-10-16 Thread GitBox
pnowojski commented on a change in pull request #9853: 
[FLINK-13904][checkpointing] Avoid competition of checkpoint triggering
URL: https://github.com/apache/flink/pull/9853#discussion_r335409965
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 ##
 @@ -70,6 +73,10 @@ public void setReleasePartitionsConsumer(BiConsumer> checkpointConsumer) {
 
 Review comment:
   I can not find the usages of this code (introduced in the 
`[FLINK-13904][tests] Support checkpoint consumer of 
SimpleAckingTaskManagerGateway` commit). Am I missing something? Is it being 
used in a later PR (if so, can you move it to the correct PR)? Or is it some 
unused leftover?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services