[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-15 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r294067081
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1329,10 +1331,13 @@ private void discardCheckpoint(PendingCheckpoint 
pendingCheckpoint, @Nullable Th
 
LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
-   if (cause == null || cause instanceof 
CheckpointDeclineException) {
-   
pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause);
+   if (cause == null) {
+   failPendingCheckpoint(pendingCheckpoint, 
CheckpointFailureReason.CHECKPOINT_DECLINED);
+   } else if (cause instanceof CheckpointException) {
+   CheckpointException exception = (CheckpointException) 
cause;
+   failPendingCheckpoint(pendingCheckpoint, 
exception.getCheckpointFailureReason(), cause);
} else {
-   
pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause);
+   failPendingCheckpoint(pendingCheckpoint, 
CheckpointFailureReason.JOB_FAILURE, cause);
 
 Review comment:
   Actually, `CheckpointFailureManager` can take more effect in the future but 
not now. This is an intermediate step of the whole three PRs. In this PR, we 
need to keep compatible with `setFailOnCheckpointingErrors`, it's the most 
important thing. Otherwise, it will change many user behaviors. We have 
considered counting more failure reason before, but it will make more changes 
and make this PR more complex. So your thought is right but not for now.
   
   The purpose of this PR is to introduce the `CheckpointFailureManager` and do 
further residual refactor work for the first PR #7571. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-15 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r294066885
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 ##
 @@ -252,6 +255,22 @@ public void setFailOnCheckpointingErrors(boolean 
failOnCheckpointingErrors) {
this.failOnCheckpointingErrors = failOnCheckpointingErrors;
}
 
+   /**
+* Get the tolerable checkpoint failure number which used by the 
checkpoint failure manager
+* to determine when we need to fail the job.
+*/
+   public int getTolerableCheckpointFailureNumber() {
+   return tolerableCheckpointFailureNumber;
+   }
+
+   /**
+* Set the tolerable checkpoint failure number, the default value is 0 
that means
+* we do not tolerance any checkpoint failure.
+*/
+   public void setTolerableCheckpointFailureNumber(int 
tolerableCheckpointFailureNumber) {
 
 Review comment:
   IMO, you are wrong. `CheckpointingOptions` defines some global config 
options which almost are related to the **state**. But 
`tolerableCheckpointFailureNumber ` a checkpoint specific config option belongs 
per job and it will enhance the config option of 
`setFailOnCheckpointingErrors`. It should not be defined in 
`CheckpointingOptions`. Currently, is it not used? Yes. Because we split this 
feature into three steps. It will be used in the third step. But in this step, 
we need it to create the `CheckpointFailureManager`. I think you know the 
reason now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293623628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 ##
 @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
-   boolean isPerfetCheckpointForRecovery) {
+   boolean isPerfetCheckpointForRecovery,
+   int tolerableCpFailureNumber) {
 
// sanity checks
-   if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+   if (checkpointInterval < 10 || checkpointTimeout < 10 ||
 
 Review comment:
   Yes, I have a good reason. `ExecutionGraph#enableCheckpointing`(see 
[here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L536))
 consider the illegal value is (`< 10`) and the [test 
case](https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java#L189)
 also know this. So we'd better have the same criterion.
   
   From a realistic point of view, I also think that 10 is a meaningful value. 
If the interval is allowed to be 1 ms, the frequency is too high and therefore 
has no practical significance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293628893
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+   private final Set countedCheckpointIds;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 .");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //for compatibility purposes with user job behavior
+   case CHECKPOINT_DECLINED_TASK_NOT_READY:
+   case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING:
+   case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED:
+   case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
+   case CHECKPOINT_DECLINED_SUBSUMED:
+   case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   //ignore
+   break;
+
+   case CHECKPOINT_DECLINED:
+   //we should make sure one checkpoint only be 
counted once
+ 

[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627636
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 ##
 @@ -618,7 +618,7 @@ public void testWithCheckPointing() throws Exception {
// expected behaviour
}
 
-   env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, true);
+   env.enableCheckpointing(10, 
CheckpointingMode.EXACTLY_ONCE, true);
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627605
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 ##
 @@ -609,7 +609,7 @@ public void testWithCheckPointing() throws Exception {
// Test force checkpointing
 
try {
-   env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, false);
+   env.enableCheckpointing(10, 
CheckpointingMode.EXACTLY_ONCE, false);
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627519
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ##
 @@ -583,7 +583,7 @@ private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig();
 
long interval = cfg.getCheckpointInterval();
-   if (interval > 0) {
+   if (interval >= 10) {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627478
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 ##
 @@ -125,8 +128,8 @@ public long getCheckpointInterval() {
 * @param checkpointInterval The checkpoint interval, in milliseconds.
 */
public void setCheckpointInterval(long checkpointInterval) {
-   if (checkpointInterval <= 0) {
-   throw new IllegalArgumentException("Checkpoint interval 
must be larger than zero");
+   if (checkpointInterval < 10) {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627503
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 ##
 @@ -146,8 +149,8 @@ public long getCheckpointTimeout() {
 * @param checkpointTimeout The checkpoint timeout, in milliseconds.
 */
public void setCheckpointTimeout(long checkpointTimeout) {
-   if (checkpointTimeout <= 0) {
-   throw new IllegalArgumentException("Checkpoint timeout 
must be larger than zero");
+   if (checkpointTimeout < 10) {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293625337
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1367,13 +1503,18 @@ public void testMinTimeBetweenCheckpointsInterval() 
throws Exception {
 
final long delay = 50;
 
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   12,   // periodic interval is 12 ms
 
 Review comment:
   Yes, it is required. Just like the prior comment, I have changed the 
condition of sanity checks in `CheckpointCoordinatorConfiguration.java`. We 
should have the same criterion with `ExecutionGraph`, the correct condition is 
interval must be larger than or equal to 10.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293623628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 ##
 @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
-   boolean isPerfetCheckpointForRecovery) {
+   boolean isPerfetCheckpointForRecovery,
+   int tolerableCpFailureNumber) {
 
// sanity checks
-   if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+   if (checkpointInterval < 10 || checkpointTimeout < 10 ||
 
 Review comment:
   Yes, I have a good reason. `ExecutionGraph#enableCheckpointing`(see 
[here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L536))
 consider the illegal value is (`< 10`) and the [test 
case](https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java#L189)
 also know this. So we'd better have the same criterion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293625337
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1367,13 +1503,18 @@ public void testMinTimeBetweenCheckpointsInterval() 
throws Exception {
 
final long delay = 50;
 
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   12,   // periodic interval is 12 ms
 
 Review comment:
   Yes, it is required. Just like the prior comment, I have changed the 
condition of sanity checks in `CheckpointCoordinatorConfiguration.java`. We 
should have the same criterion, the correct condition is interval must be 
larger than or equal to 10.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293623628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 ##
 @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
-   boolean isPerfetCheckpointForRecovery) {
+   boolean isPerfetCheckpointForRecovery,
+   int tolerableCpFailureNumber) {
 
// sanity checks
-   if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+   if (checkpointInterval < 10 || checkpointTimeout < 10 ||
 
 Review comment:
   Yes, I have a good reason. `CheckpointConfig#setCheckpointInterval` and 
`CheckpointConfig#setCheckpointTimeout` consider the illegal value is (`< 10`). 
So we'd better have the same criterion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293461498
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+   private final ConcurrentMap countedCheckpointIds;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 .");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   this.countedCheckpointIds = new ConcurrentHashMap<>();
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //for compatibility purposes with user job behavior
+   case CHECKPOINT_DECLINED_TASK_NOT_READY:
+   case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING:
+   case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED:
+   case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
+   case CHECKPOINT_DECLINED_SUBSUMED:
+   case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   //ignore
+   break;
+
+   case CHECKPOINT_DECLINED:
+   //we should make sure one checkpoint only be 

[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293450918
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+   private final ConcurrentMap countedCheckpointIds;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 .");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   this.countedCheckpointIds = new ConcurrentHashMap<>();
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //for compatibility purposes with user job behavior
+   case CHECKPOINT_DECLINED_TASK_NOT_READY:
+   case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING:
+   case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED:
+   case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
+   case CHECKPOINT_DECLINED_SUBSUMED:
+   case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   //ignore
+   break;
+
+   case CHECKPOINT_DECLINED:
+   //we should make sure one checkpoint only be 

[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-12 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293171972
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber <= 
UNLIMITED_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than or equal to " + UNLIMITED_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
 
 Review comment:
   I have added judge logic when counting, considering the failure reason of 
special case(negative checkpoint ids) are all ignored, it's OK we do not need 
to take care of it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-12 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293006930
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1329,10 +1331,35 @@ private void discardCheckpoint(PendingCheckpoint 
pendingCheckpoint, @Nullable Th
 
LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
-   if (cause == null || cause instanceof 
CheckpointDeclineException) {
-   
pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause);
+   if (cause == null) {
 
 Review comment:
   If all `CheckpointException` objects come from the Flink code, this style is 
OK.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-12 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293003163
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber <= 
UNLIMITED_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than or equal to " + UNLIMITED_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
 
 Review comment:
   Good catch! You are right, I will consider and fix it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-12 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293002340
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -439,6 +436,10 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().get();
 
 Review comment:
   I think to get the latest generated checkpoint id is the simplest and most 
direct way. In this way, we do not consider any potential problems:
   
   - The concurrent problem, although at present, there is no risk to introduce 
a field to store, but based on Checkpoint IDCounter, there is no need to worry 
about this problem at all.
   - Introducing a global field will bring potential misuse risks and hidden 
dangers for subsequent use in other logic in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-27 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r287661884
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -540,14 +531,13 @@ public void enableCheckpointing(
 
checkpointStatsTracker = checkNotNull(statsTracker, 
"CheckpointStatsTracker");
 
+   CheckpointFailureManager failureManager = new 
CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(), () ->
+   failGlobal(new FlinkRuntimeException("Exceeded 
checkpoint tolerable failure threshold.")));
 
 Review comment:
   Hi @StefanRRichter and @tillrohrmann ping for help, I suspect that this call 
will cause a deadlock with [this 
check](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1196).
 WDYT? Any opinion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286778051
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+   @Test
+   public void testContinuousFailure() {
 
 Review comment:
   I know what you mean, please see this [discussion 
section](https://github.com/apache/flink/pull/8322#discussion_r280328078). 
Currently, the counting mechanism is not based on checkpoint order and based on 
execution order (execution phase). It's a temporal solution, we will give a 
better solution in the future. Its jira issue id is FLINK-12514.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286778051
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+   @Test
+   public void testContinuousFailure() {
 
 Review comment:
   I know what you mean, please see this [discussion 
section](https://github.com/apache/flink/pull/8322#discussion_r280328078). 
Currently, the counting mechanism is not based on checkpoint order. It's a 
temporal solution, we will give a better solution in the future. Its jira issue 
id is FLINK-12514.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286774079
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+   @Test
+   public void testContinuousFailure() {
 
 Review comment:
   I think it's not necessary. The counter is `AtomicInteger` and we tolerance 
concurrent checkpoint and concurrent counting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286773309
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -512,24 +513,19 @@ public boolean isArchived() {
}
 
public void enableCheckpointing(
-   long interval,
-   long checkpointTimeout,
-   long minPauseBetweenCheckpoints,
-   int maxConcurrentCheckpoints,
-   CheckpointRetentionPolicy retentionPolicy,
+   CheckpointCoordinatorConfiguration chkConfig,
List verticesToTrigger,
List verticesToWaitFor,
List verticesToCommitTo,
List> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
-   CheckpointStatsTracker statsTracker,
-   boolean isPreferCheckpointForRecovery) {
+   CheckpointStatsTracker statsTracker) {
 
// simple sanity checks
-   checkArgument(interval >= 10, "checkpoint interval must not be 
below 10ms");
-   checkArgument(checkpointTimeout >= 10, "checkpoint timeout must 
not be below 10ms");
+   checkArgument(chkConfig.getCheckpointInterval() >= 10, 
"checkpoint interval must not be below 10ms");
 
 Review comment:
   Actually, the interval in many places is not united. In CheckpointConfig it 
used -1 as a disabled flag and the value which large than 0 is legal. So in 
CheckpointCoordinatorConfiguration the sanity check based on 1. It's better to 
unify them. Either -1 or larger than or equal to 10. So agree with you!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-19 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285428078
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   Have added a new test case to verify the exception behavior.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285325598
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/test/StatefulStreamingJob.java
 ##
 @@ -150,6 +150,7 @@ public static void main(String[] args) throws Exception {
env.setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(1000L);
env.getConfig().disableGenericTypes();
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
 
 Review comment:
   After removing all the setting to tolerable checkpoint failure number, this 
case is the first one which causes Travis's failure. Error detail : 
   
   ```java
   2019-05-17 15:43:08,068 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 66 @ 1558107788056 for job dd4f07c1f866b7afff42759d7a35de3f.
   2019-05-17 15:43:08,207 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
checkpoint 66 by task e65ff0e6f0ff8e14a1b53410aa8bb019 of job 
dd4f07c1f866b7afff42759d7a35de3f.
   2019-05-17 15:43:08,208 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 66 of job dd4f07c1f866b7afff42759d7a35de3f.
   
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source (1/1) was not running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1150)
at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
   2019-05-17 15:43:08,213 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink 
Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) switched from state RUNNING to 
FAILING.
   org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$enableCheckpointing$0(ExecutionGraph.java:540)
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpoint(CheckpointCoordinator.java:1401)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1333)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:758)
at 
org.apache.flink.runtime.jobmaster.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:556)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
   2019-05-17 15:43:08,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source (1/1) (e65ff0e6f0ff8e14a1b53410aa8bb019) switched from RUNNING to 
CANCELING.
   2019-05-17 15:43:08,238 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map -> Sink: 
Unnamed (1/1) (b6eca113b5bb384dd099e20a14250dcb) switched from RUNNING to 
CANCELING.
   2019-05-17 15:43:08,324 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source (1/1) (e65ff0e6f0ff8e14a1b53410aa8bb019) switched from CANCELING to 
CANCELED.
   2019-05-17 15:43:08,345 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map -> Sink: 
Unnamed (1/1) (b6eca113b5bb384dd099e20a14250dcb) switched from CANCELING to 
CANCELED.
   2019-05-17 15:43:08,353 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Try to restart 
or fail the job Flink Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) if no 
longer possible.
   2019-05-17 15:43:08,356 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink 
Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) switched from state FAILING to 
FAILED.
   org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 

[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285105148
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   OK, I will try to refactor the test cases which contain the failure manager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285095516
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   OK, I will follow the steps I listed lie. Let's analyze them case by case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285093338
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   I agree. So, to see more details do you agree I remove the other settings 
about tolerance number. Let Travis replay all possible situations?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285064641
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   I see many waiting JobMaster print, but not very sure. I think it could be 
declined before this PR. It was OK is because the old tolerance mechanism 
exists TM end. But, now, it would trigger restart by `CheckpointFailureManager`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285061845
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   This one I can make sure, the Kafka end to end test script checked the log 
file is not empty and the log reported: the count is exceed the tolerance 
number.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285061845
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   This one I can make sure, the Kafka end to end test script checked the log 
file is not empty and the log reported: the count exceeds the tolerance number.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285061845
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   This one I can make sure, the Kafka end to end test script checked the log 
file is not empty and the log reported: the count is execeed the tolerance 
number.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285060806
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ##
 @@ -295,6 +295,7 @@ public void runAutoOffsetRetrievalAndCommitToKafka() 
throws Exception {

env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(parallelism);
env.enableCheckpointing(200);
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   This one has been reported to Jira : 
https://issues.apache.org/jira/browse/FLINK-12542


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285059373
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/test/StatefulStreamingJob.java
 ##
 @@ -150,6 +150,7 @@ public static void main(String[] args) throws Exception {
env.setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(1000L);
env.getConfig().disableGenericTypes();
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
 
 Review comment:
   I am trying to find this case, I triggered so many Travis building. I 
remember three checkpoints ware declined in this case's log.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285058131
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   Yes, actually, I am not very sure about this case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285050218
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   Log detail for this one: https://travis-ci.org/apache/flink/jobs/533358204


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285046388
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   I understand, my PR changed these test cases' default behavior and caused 
them should specify the definite tolerance number (not default). So they may 
trigger compatibility problem or cause more test cases failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285046388
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   I understand, my PR changed these test cases' default behavior and caused 
them must specify the definite tolerance number (not default). So they may 
trigger compatibility problem(users existed job) or cause more test cases 
failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285046388
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   I understand, my PR changed these test cases' default behavior and caused 
them must specify the definite tolerance number (not default). So they may 
trigger compatibility problem or cause more test cases failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285044045
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   Ok, let me find the specific old logs why it was failed and we can discuss 
based on it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285041637
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   I think the default value should be `0` to keep the same behavior with 
`failOnCheckpointingErrors`(default is `true`). I changed it to fix the test 
case. Because of some test case (integration test) is not stable, sometimes the 
checkpoint would be declined, and they do not accept job recovery by 
`CheckpointFailureManager` because they do not allow restart and check the log 
files exists the exception message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285041637
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   I think the default value should be `0` to keep the same behavior with 
`failOnCheckpointingErrors`(default is `true`). I changed it to fix the test 
case. Because of some test case (integration test) is not stable, sometimes the 
checkpoint would be declined, and they do not accept job recovery by 
`CheckpointFailureManager`, some of them do not allow restart and check the log 
files.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-17 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285041637
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ##
 @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
env.setParallelism(1);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   I think the default value should be `0` to keep the same behavior with 
`failOnCheckpointingErrors`(default is `true`). I changed it to fix the test 
case. Because of some test case (integration test) is not stable, sometimes the 
checkpoint would be declined, and they do not accept job recovery by 
`CheckpointFailureManager`, some of them do not allow restart, some of them 
will check the log files.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-16 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284740074
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 ##
 @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
-   boolean isPerfetCheckpointForRecovery) {
+   boolean isPerfetCheckpointForRecovery,
+   int tolerableCpFailureNumber) {
 
// sanity checks
if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1 ||
+   tolerableCpFailureNumber < 0 || 
tolerableCpFailureNumber >= Integer.MAX_VALUE) {
 
 Review comment:
   Sorry, it's my negligence. Have fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-16 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284720134
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber < 
MAXIMUM_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //ignore
+   break;
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case CHECKPOINT_DECLINED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   continuousFailureCounter.incrementAndGet();
+   break;
+
+   default:
+   throw new FlinkRuntimeException("Unknown 
checkpoint failure reason : " + reason.name());
+   }
+
+   if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
 
 Review comment:
   Accepted! Have updated the code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-16 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284711957
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber < 
MAXIMUM_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //ignore
+   break;
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case CHECKPOINT_DECLINED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   continuousFailureCounter.incrementAndGet();
+   break;
+
+   default:
+   throw new FlinkRuntimeException("Unknown 
checkpoint failure reason : " + reason.name());
+   }
+
+   if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
 
 Review comment:
   The reason I did not use `equal` is here. So if I accept your suggestion, 
here should change to `continuousFailureCounter.get() > 
tolerableCpFailureNumber || continuousFailureCounter.get() < 0`. The 
`continuousFailureCounter.get() < 0` means 

[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-15 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284156148
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber < 
MAXIMUM_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
 
 Review comment:
   Have added the description about the checkpoint id parameter. And Have 
created a issue (FLINK-12514) to track the refactor work about the counting 
mechanism based on ordered checkpoint id in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-15 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284141262
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber < 
MAXIMUM_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
 
 Review comment:
   OK, will give more description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-15 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284140921
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   It seems there are still two test failure issues hang the JVM process, but I 
cannot find a definite description, one of them is successful in my local test. 
travis : https://travis-ci.org/apache/flink/builds/532612594


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-15 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284102152
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > What I meant is removing the `try-catch(CheckpointException)`around the 
`triggerCheckpoint` call completely and exchange the `throw`s inside the method 
by direct calls to the failure handler.
   
   I have accepted all the suggestion, except this one, because when I was 
refactoring, I found the complexity may exceed our imagination, some obstacle:
   
   1)   
   ```
   public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
   ```
   
   This method returns a boolean value, so it needs a binary state, when we 
catch `CheckpointException` we return false and when we get `PendingCheckpoint` 
we return true. If the real triggerCheckpoint method does not throw 
`CheckpointException` how two distinguish binary state?
   
   2) The triggerCheckpoint's boolean return value has been used in so many 
test case to judge whether a checkpoint is successful or not.
   
   3) The `triggerSavepointInternal` method need to catch the 
`CheckpointException` to return a completed exceptionally for the future 
object, that means `triggerSavepointInternal` also needs a binary state for 
`CompletableFuture`
   
   4) I have implemented `CheckpointIDCounter#get`, after providing this 
function. I think report failure centralized would be better than scattered 
code, just like we provide a unified `PendingCheckpoint#abort` before.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-15 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284102152
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > What I meant is removing the `try-catch(CheckpointException)`around the 
`triggerCheckpoint` call completely and exchange the `throw`s inside the method 
by direct calls to the failure handler.
   
   I have accepted all the suggestion, except this one, because when I was 
refactoring, I found the complexity may exceed our imagination, some obstacle:
   
   1)   
   ```
   public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
   ```
   
   This method returns a boolean value, so it needs a binary state, when we 
catch `CheckpointException` we return false and when we get `PendingCheckpoint` 
we return true. If the real triggerCheckpoint method does not throw 
`CheckpointException` how two distinguish binary state?
   
   2)The triggerCheckpoint's boolean return value has been used in so many test 
case to judge whether a checkpoint is successful or not.
   
   3) The `triggerSavepointInternal` method need to catch the 
`CheckpointException` to return a completed exceptionally for the future 
object, that means `triggerSavepointInternal` also needs a binary state for 
`CompletableFuture`
   
   4) I have implemented `CheckpointIDCounter#get`, after providing this 
function. I think report failure centralized would be better than scattered 
code, just like we provide a unified `PendingCheckpoint#abort` before.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r284070033
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > BTW, I have one more important additional point, which is the 
`numUnsuccessfulCheckpointsTriggers` in checkpoint coordinator, which 
absolutely sounds like something that should now be moved into the failure 
manager, wdyt?
   
   After we provided `CheckpointFailureManager`, IMO the 
`numUnsuccessfulCheckpointsTriggers` is not valuable. Currently, it is 
incremented in the trigger lock of method `triggerCheckpoint`. The domain of 
trigger lock is the subset of the trigger phase which been 
`CheckpointFailureManager` understood. So the counting is not correct. And it 
is just for logging purpose. So I suggest we could remove it in this PR or in 
the third step (next step). If the logging is really necessary, we could do it 
again after we implemented the new counting logic based on checkpoint id 
sequence. 
   
   What's your opinion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283766909
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   Do you think we should provide a `CheckpointIDCounter#get` API?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283764177
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > Rest of my comments still holds though, why not remove the 
throwing/catching of an exception that we can also directly report to the 
failure manager? I also think that makes other counts accurate because it 
obtaining the id can then always happen under the lock.
   
   This try/catch block:
   
   ```
   try {
long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
failureManager.handleCheckpointException(e, -1 
* latestGeneratedCheckpointId);
} catch (Exception e1) {
LOG.warn("Get latest generated checkpoint id 
error : ", e1);
}
   ```
   is because the `CheckpointIDCounter#getAndIncrement ` throws a checked 
`Exception`.
   
   I think if it can change to this code snippet then looks better right:
   
   ```
   public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
try {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
long latestGeneratedCheckpointId = 
getCheckpointIdCounter().get();
failureManager.handleCheckpointException(e, -1 * 
latestGeneratedCheckpointId);
return false;
}
}
   ```
   
   If the reported behavior exists in the real `triggerCheckpoint` method, it 
would cause more code to report exception and generate new checkpoint id. In 
addition, there are two locks exist in the method.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283764177
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > Rest of my comments still holds though, why not remove the 
throwing/catching of an exception that we can also directly report to the 
failure manager? I also think that makes other counts accurate because it 
obtaining the id can then always happen under the lock.
   
   This try/catch block:
   
   ```
   try {
long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
failureManager.handleCheckpointException(e, -1 
* latestGeneratedCheckpointId);
} catch (Exception e1) {
LOG.warn("Get latest generated checkpoint id 
error : ", e1);
}
   ```
   is because the `CheckpointIDCounter#getAndIncrement ` throws a checked 
`Exception`.
   
   I think if it can change to this code snippet then looks better right?
   
   ```
   public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
try {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
long latestGeneratedCheckpointId = 
getCheckpointIdCounter().get();
failureManager.handleCheckpointException(e, -1 * 
latestGeneratedCheckpointId);
return false;
}
}
   ```
   
   If the reported behavior exists in the real `triggerCheckpoint` method, it 
would cause more code to report exception and generate new checkpoint id. In 
addition, there are two locks exist in the method.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283758545
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > If you read my first comment, it was not about that you get the value from 
that counter, but I wondered why you do `getAndIncrement` instead of just `get`.
   
   Actually, I also want to use `get` API, but `CheckpointIDCounter ` does not 
provide `get` API.
   
   > Rest of my comments still holds though, why not remove the 
throwing/catching of an exception that we can also directly report to the 
failure manager? I also think that makes other counts accurate because it 
obtaining the id can then always happen under the lock.
   
   OK, agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283755801
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > I thought about this but can not make sure if we can get the correct 
latest checkpoint id when allow concurrent checkpointing.
   
   Actually, I mean I cannot get a better way to extract the 
`latestGeneratedCheckpointId ` except using `CheckpointIDCounter`. I think 
using `pendingCheckpoints` or `recentPendingCheckpoints` to get the latest 
pending checkpoint may not accurate when allow concurrent checkpointing. 
   
   What's your suggestion to get the value of this variable?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-14 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283746719
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
+   try {
+   long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   I thought about this but can not make sure if we can get the correct latest 
checkpoint id when allow concurrent checkpointing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283265154
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final static short IGNORE_FLAG = 0;
+   private final static short COUNT_FLAG = 1;
+   private final static short SUCCEED_FLAG = -1;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final TreeMap serialCheckpointResultTable;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber < 
MAXIMUM_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.failureCallback = checkNotNull(failureCallback);
+   this.serialCheckpointResultTable = new 
TreeMap<>(Collections.reverseOrder());
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
 
 Review comment:
   @StefanRRichter Still the old question, if I include the checkpoint id in 
this method(your second suggestion), what arg should be passed in 
[here](https://github.com/apache/flink/pull/8322/files#diff-a38ea0fa799bdaa0b354d80cd8368c60R442)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283257385
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final static short IGNORE_FLAG = 0;
+   private final static short COUNT_FLAG = 1;
+   private final static short SUCCEED_FLAG = -1;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final TreeMap serialCheckpointResultTable;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber < 
MAXIMUM_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.failureCallback = checkNotNull(failureCallback);
+   this.serialCheckpointResultTable = new 
TreeMap<>(Collections.reverseOrder());
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
 
 Review comment:
   @StefanRRichter Thanks for your suggestion, actually, I have not checked and 
verified the new logic of counting. I was implementing it and found it has been 
blocked by the checkpoint id. About the checkpoint id, I agree with you, adding 
it into checkpoint exception is not a good idea.
   
   So I will roll back the current implementation to the old one and create an 
issue to refactor the counting mechanism.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-11 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283090199
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final static short IGNORE_FLAG = 0;
+   private final static short COUNT_FLAG = 1;
+   private final static short SUCCEED_FLAG = -1;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final TreeMap serialCheckpointResultTable;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber < 
MAXIMUM_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.failureCallback = checkNotNull(failureCallback);
+   this.serialCheckpointResultTable = new 
TreeMap<>(Collections.reverseOrder());
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
 
 Review comment:
   Hi @StefanRRichter I have refactored the mechanism of counting based on your 
suggestion, it considers the checkpoint id's sequence. But when I am 
implementing, I meet a problem: the `CheckpointException` caused by **Trigger** 
phase may not get the checkpoint id.
   
   Currently, the method `triggerCheckpoint` has two results:
   
   * Gets a pending checkpoint (can get the checkpoint id)
   * Throws a `CheckpointException` (whether could get checkpoint id or not 
depends on the exception's throw-point in this method)
   
   So, I can not get the checkpoint id 
[here](https://github.com/apache/flink/pull/8322/files#diff-a38ea0fa799bdaa0b354d80cd8368c60R442).
   
   My thought is that we could inject the checkpoint id into the 
`CheckpointException`(it seems the semantic looks strange?), if we can not 
inject it, we can use a default value(-1). Then in `CheckpointFailureManager`, 
if we can not get a normal checkpoint (we get `-1` which means the checkpoint 
is not been generated in trigger phase), we would ignore this case. Actually, 
it seems this case is not the scene which we want to tolerance.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-09 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r282453955
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 ##
 @@ -296,7 +296,8 @@ public static ExecutionGraph buildGraph(
checkpointIdCounter,
completedCheckpoints,
rootBackend,
-   checkpointStatsTracker);
+   checkpointStatsTracker,
+   
chkConfig.getTolerableCheckpointFailureNumber());
 
 Review comment:
   Agree


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-09 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r282451226
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ##
 @@ -115,6 +115,8 @@
 
private volatile ScheduledFuture cancellerHandle;
 
+   private final CheckpointFailureManager failureManager;
 
 Review comment:
   Yes, introducing `CheckpointFailureManager` into `PendingCheckpoint` is to 
avoid scattering calls of `failureManager.handleCheckpointException(exception)` 
over the coordinator. I accept this suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-09 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r282443479
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
+   "The tolerable checkpoint failure number must be larger 
than 0.");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param callback the handler callback which defines the process logic.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
FailureHandlerCallback callback) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //ignore
+   break;
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case CHECKPOINT_DECLINED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   continuousFailureCounter.incrementAndGet();
+   break;
+
+   default:
+   throw new RuntimeException("Unknown checkpoint 
failure reason : " + reason.name());
+   }
+
+   if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+   callback.process();
+   }
+   }
+
+   /**
+* Handle checkpoint success.
+*/
+   public void handleCheckpointSuccess() {
+   continuousFailureCounter.set(0);
 
 Review comment:
   After thinking deeply, I agree with you. Following the checkpoint's sequence 
is a better choice. Accept.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-08 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r282047625
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
+   "The tolerable checkpoint failure number must be larger 
than 0.");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param callback the handler callback which defines the process logic.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
FailureHandlerCallback callback) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //ignore
+   break;
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case CHECKPOINT_DECLINED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   continuousFailureCounter.incrementAndGet();
+   break;
+
+   default:
+   throw new RuntimeException("Unknown checkpoint 
failure reason : " + reason.name());
+   }
+
+   if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+   callback.process();
+   }
+   }
+
+   /**
+* Handle checkpoint success.
+*/
+   public void handleCheckpointSuccess() {
+   continuousFailureCounter.set(0);
 
 Review comment:
   Good question. Actually, I think our focus is different. Just like I 
referenced the variable name `continuousFailureCounter`. I'm more concerned 
about “continuous” in the parallel scene. I think you are more concerned about 
"failure counter" based on checkpoint sequence(trigger order).
   
   To tolerate successive checkpoint failures, there are usually two purposes:
   
   * Users don't want successful checkpoints to be too far away from the 
current time (if they can't succeed from now on);
   * In many cases, continuous failures are caused by third-party system 
exceptions when interacting with third-party systems. If it is short-term 
exceptions, then we are not inclined to restart job. If it is long-term 
exceptions, then we are willing to restart Job.
   
   The first one is more likely to agree with you, while the second one is more 
likely to be related to the behavior at the time of execution. In your example, 
if the old checkpoint 

[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-08 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r281928748
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
+   "The tolerable checkpoint failure number must be larger 
than 0.");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param callback the handler callback which defines the process logic.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
FailureHandlerCallback callback) {
 
 Review comment:
   I am sorry the change of the implementation makes you confused. The reason I 
introduced a callback interface because we must make the "decision" (let job 
fail or something else). Although we have not distinguished the `Trigger` and 
`Execution` phase. But there are still two different behavior: `sync`(method 
call in trigger phase) and `async`(message in execution phase). So I think the 
`handleCheckpoint` in design doc is not enough. We should track the two phases 
of failure (`triggerCheckpoint` and `abort`). What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-08 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r281928748
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
+   "The tolerable checkpoint failure number must be larger 
than 0.");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param callback the handler callback which defines the process logic.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
FailureHandlerCallback callback) {
 
 Review comment:
   I am sorry the change of the implementation make you confused. The reason I 
introduced a callback interface because we must make the "decision" (let job 
fail or something else). Although we have not distinguished the `Trigger` and 
`Execution` phase. But there are still two different behavior: `sync`(method 
call in trigger phase) and `async`(message in execution phase). So I think the 
`handleCheckpoint` in design doc is not enough. We should track the two phases 
of failure. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-08 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r281927266
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
+   "The tolerable checkpoint failure number must be larger 
than 0.");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param callback the handler callback which defines the process logic.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
FailureHandlerCallback callback) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //ignore
+   break;
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case CHECKPOINT_DECLINED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   continuousFailureCounter.incrementAndGet();
+   break;
+
+   default:
+   throw new RuntimeException("Unknown checkpoint 
failure reason : " + reason.name());
+   }
+
+   if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+   callback.process();
+   }
+   }
+
+   /**
+* Handle checkpoint success.
+*/
+   public void handleCheckpointSuccess() {
+   continuousFailureCounter.set(0);
 
 Review comment:
   The checkpoint can be executed parallelly. I think the 
"continuousFailureCounter" will just tract the invokable order, not checkpoint 
sequence number. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-08 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r281923773
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
 
 Review comment:
   Agree, it's my negligence.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services