[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r294067081 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1329,10 +1331,13 @@ private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Th LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - if (cause == null || cause instanceof CheckpointDeclineException) { - pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause); + if (cause == null) { + failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED); + } else if (cause instanceof CheckpointException) { + CheckpointException exception = (CheckpointException) cause; + failPendingCheckpoint(pendingCheckpoint, exception.getCheckpointFailureReason(), cause); } else { - pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause); + failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause); Review comment: Actually, `CheckpointFailureManager` can take more effect in the future but not now. This is an intermediate step of the whole three PRs. In this PR, we need to keep compatible with `setFailOnCheckpointingErrors`, it's the most important thing. Otherwise, it will change many user behaviors. We have considered counting more failure reason before, but it will make more changes and make this PR more complex. So your thought is right but not for now. The purpose of this PR is to introduce the `CheckpointFailureManager` and do further residual refactor work for the first PR #7571. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r294066885 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ## @@ -252,6 +255,22 @@ public void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors) { this.failOnCheckpointingErrors = failOnCheckpointingErrors; } + /** +* Get the tolerable checkpoint failure number which used by the checkpoint failure manager +* to determine when we need to fail the job. +*/ + public int getTolerableCheckpointFailureNumber() { + return tolerableCheckpointFailureNumber; + } + + /** +* Set the tolerable checkpoint failure number, the default value is 0 that means +* we do not tolerance any checkpoint failure. +*/ + public void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber) { Review comment: IMO, you are wrong. `CheckpointingOptions` defines some global config options which almost are related to the **state**. But `tolerableCheckpointFailureNumber ` a checkpoint specific config option belongs per job and it will enhance the config option of `setFailOnCheckpointingErrors`. It should not be defined in `CheckpointingOptions`. Currently, is it not used? Yes. Because we split this feature into three steps. It will be used in the third step. But in this step, we need it to create the `CheckpointFailureManager`. I think you know the reason now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293623628 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java ## @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration( int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, - boolean isPerfetCheckpointForRecovery) { + boolean isPerfetCheckpointForRecovery, + int tolerableCpFailureNumber) { // sanity checks - if (checkpointInterval < 1 || checkpointTimeout < 1 || - minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { + if (checkpointInterval < 10 || checkpointTimeout < 10 || Review comment: Yes, I have a good reason. `ExecutionGraph#enableCheckpointing`(see [here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L536)) consider the illegal value is (`< 10`) and the [test case](https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java#L189) also know this. So we'd better have the same criterion. From a realistic point of view, I also think that 10 is a meaningful value. If the interval is allowed to be 1 ms, the frequency is too high and therefore has no practical significance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293628893 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + private final Set countedCheckpointIds; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 ."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + this.countedCheckpointIds = ConcurrentHashMap.newKeySet(); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on +* checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure +* happens before the checkpoint id generation. In this case, it will be specified a negative +* latest generated checkpoint id as a special flag. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //for compatibility purposes with user job behavior + case CHECKPOINT_DECLINED_TASK_NOT_READY: + case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING: + case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED: + case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER: + case CHECKPOINT_DECLINED_SUBSUMED: + case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM: + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + //ignore + break; + + case CHECKPOINT_DECLINED: + //we should make sure one checkpoint only be counted once +
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293627636 ## File path: flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java ## @@ -618,7 +618,7 @@ public void testWithCheckPointing() throws Exception { // expected behaviour } - env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true); + env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE, true); Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293627605 ## File path: flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java ## @@ -609,7 +609,7 @@ public void testWithCheckPointing() throws Exception { // Test force checkpointing try { - env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false); + env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE, false); Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293627519 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ## @@ -583,7 +583,7 @@ private void configureCheckpointing() { CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval = cfg.getCheckpointInterval(); - if (interval > 0) { + if (interval >= 10) { Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293627478 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ## @@ -125,8 +128,8 @@ public long getCheckpointInterval() { * @param checkpointInterval The checkpoint interval, in milliseconds. */ public void setCheckpointInterval(long checkpointInterval) { - if (checkpointInterval <= 0) { - throw new IllegalArgumentException("Checkpoint interval must be larger than zero"); + if (checkpointInterval < 10) { Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293627503 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ## @@ -146,8 +149,8 @@ public long getCheckpointTimeout() { * @param checkpointTimeout The checkpoint timeout, in milliseconds. */ public void setCheckpointTimeout(long checkpointTimeout) { - if (checkpointTimeout <= 0) { - throw new IllegalArgumentException("Checkpoint timeout must be larger than zero"); + if (checkpointTimeout < 10) { Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293625337 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ## @@ -1367,13 +1503,18 @@ public void testMinTimeBetweenCheckpointsInterval() throws Exception { final long delay = 50; + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 12, // periodic interval is 12 ms Review comment: Yes, it is required. Just like the prior comment, I have changed the condition of sanity checks in `CheckpointCoordinatorConfiguration.java`. We should have the same criterion with `ExecutionGraph`, the correct condition is interval must be larger than or equal to 10. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293623628 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java ## @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration( int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, - boolean isPerfetCheckpointForRecovery) { + boolean isPerfetCheckpointForRecovery, + int tolerableCpFailureNumber) { // sanity checks - if (checkpointInterval < 1 || checkpointTimeout < 1 || - minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { + if (checkpointInterval < 10 || checkpointTimeout < 10 || Review comment: Yes, I have a good reason. `ExecutionGraph#enableCheckpointing`(see [here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L536)) consider the illegal value is (`< 10`) and the [test case](https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java#L189) also know this. So we'd better have the same criterion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293625337 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ## @@ -1367,13 +1503,18 @@ public void testMinTimeBetweenCheckpointsInterval() throws Exception { final long delay = 50; + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 12, // periodic interval is 12 ms Review comment: Yes, it is required. Just like the prior comment, I have changed the condition of sanity checks in `CheckpointCoordinatorConfiguration.java`. We should have the same criterion, the correct condition is interval must be larger than or equal to 10. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293623628 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java ## @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration( int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, - boolean isPerfetCheckpointForRecovery) { + boolean isPerfetCheckpointForRecovery, + int tolerableCpFailureNumber) { // sanity checks - if (checkpointInterval < 1 || checkpointTimeout < 1 || - minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { + if (checkpointInterval < 10 || checkpointTimeout < 10 || Review comment: Yes, I have a good reason. `CheckpointConfig#setCheckpointInterval` and `CheckpointConfig#setCheckpointTimeout` consider the illegal value is (`< 10`). So we'd better have the same criterion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293461498 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + private final ConcurrentMap countedCheckpointIds; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 ."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + this.countedCheckpointIds = new ConcurrentHashMap<>(); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on +* checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure +* happens before the checkpoint id generation. In this case, it will be specified a negative +* latest generated checkpoint id as a special flag. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //for compatibility purposes with user job behavior + case CHECKPOINT_DECLINED_TASK_NOT_READY: + case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING: + case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED: + case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER: + case CHECKPOINT_DECLINED_SUBSUMED: + case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM: + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + //ignore + break; + + case CHECKPOINT_DECLINED: + //we should make sure one checkpoint only be
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293450918 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + private final ConcurrentMap countedCheckpointIds; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 ."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + this.countedCheckpointIds = new ConcurrentHashMap<>(); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on +* checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure +* happens before the checkpoint id generation. In this case, it will be specified a negative +* latest generated checkpoint id as a special flag. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //for compatibility purposes with user job behavior + case CHECKPOINT_DECLINED_TASK_NOT_READY: + case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING: + case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED: + case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER: + case CHECKPOINT_DECLINED_SUBSUMED: + case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM: + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + //ignore + break; + + case CHECKPOINT_DECLINED: + //we should make sure one checkpoint only be
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293171972 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber <= UNLIMITED_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than or equal to " + UNLIMITED_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on +* checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure +* happens before the checkpoint id generation. In this case, it will be specified a negative +* latest generated checkpoint id as a special flag. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { Review comment: I have added judge logic when counting, considering the failure reason of special case(negative checkpoint ids) are all ignored, it's OK we do not need to take care of it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293006930 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1329,10 +1331,35 @@ private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Th LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - if (cause == null || cause instanceof CheckpointDeclineException) { - pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause); + if (cause == null) { Review comment: If all `CheckpointException` objects come from the Flink code, this style is OK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293003163 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber <= UNLIMITED_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than or equal to " + UNLIMITED_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on +* checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure +* happens before the checkpoint id generation. In this case, it will be specified a negative +* latest generated checkpoint id as a special flag. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { Review comment: Good catch! You are right, I will consider and fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r293002340 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -439,6 +436,10 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + long latestGeneratedCheckpointId = getCheckpointIdCounter().get(); Review comment: I think to get the latest generated checkpoint id is the simplest and most direct way. In this way, we do not consider any potential problems: - The concurrent problem, although at present, there is no risk to introduce a field to store, but based on Checkpoint IDCounter, there is no need to worry about this problem at all. - Introducing a global field will bring potential misuse risks and hidden dangers for subsequent use in other logic in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r287661884 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -540,14 +531,13 @@ public void enableCheckpointing( checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); + CheckpointFailureManager failureManager = new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(), () -> + failGlobal(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."))); Review comment: Hi @StefanRRichter and @tillrohrmann ping for help, I suspect that this call will cause a deadlock with [this check](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1196). WDYT? Any opinion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286778051 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { Review comment: I know what you mean, please see this [discussion section](https://github.com/apache/flink/pull/8322#discussion_r280328078). Currently, the counting mechanism is not based on checkpoint order and based on execution order (execution phase). It's a temporal solution, we will give a better solution in the future. Its jira issue id is FLINK-12514. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286778051 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { Review comment: I know what you mean, please see this [discussion section](https://github.com/apache/flink/pull/8322#discussion_r280328078). Currently, the counting mechanism is not based on checkpoint order. It's a temporal solution, we will give a better solution in the future. Its jira issue id is FLINK-12514. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286774079 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { Review comment: I think it's not necessary. The counter is `AtomicInteger` and we tolerance concurrent checkpoint and concurrent counting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286773309 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -512,24 +513,19 @@ public boolean isArchived() { } public void enableCheckpointing( - long interval, - long checkpointTimeout, - long minPauseBetweenCheckpoints, - int maxConcurrentCheckpoints, - CheckpointRetentionPolicy retentionPolicy, + CheckpointCoordinatorConfiguration chkConfig, List verticesToTrigger, List verticesToWaitFor, List verticesToCommitTo, List> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, - CheckpointStatsTracker statsTracker, - boolean isPreferCheckpointForRecovery) { + CheckpointStatsTracker statsTracker) { // simple sanity checks - checkArgument(interval >= 10, "checkpoint interval must not be below 10ms"); - checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms"); + checkArgument(chkConfig.getCheckpointInterval() >= 10, "checkpoint interval must not be below 10ms"); Review comment: Actually, the interval in many places is not united. In CheckpointConfig it used -1 as a disabled flag and the value which large than 0 is legal. So in CheckpointCoordinatorConfiguration the sanity check based on 1. It's better to unify them. Either -1 or larger than or equal to 10. So agree with you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285428078 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: Have added a new test case to verify the exception behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285325598 ## File path: flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/test/StatefulStreamingJob.java ## @@ -150,6 +150,7 @@ public static void main(String[] args) throws Exception { env.setRestartStrategy(RestartStrategies.noRestart()); env.enableCheckpointing(1000L); env.getConfig().disableGenericTypes(); + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); Review comment: After removing all the setting to tolerable checkpoint failure number, this case is the first one which causes Travis's failure. Error detail : ```java 2019-05-17 15:43:08,068 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 66 @ 1558107788056 for job dd4f07c1f866b7afff42759d7a35de3f. 2019-05-17 15:43:08,207 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 66 by task e65ff0e6f0ff8e14a1b53410aa8bb019 of job dd4f07c1f866b7afff42759d7a35de3f. 2019-05-17 15:43:08,208 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 66 of job dd4f07c1f866b7afff42759d7a35de3f. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source (1/1) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1150) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-05-17 15:43:08,213 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) switched from state RUNNING to FAILING. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$enableCheckpointing$0(ExecutionGraph.java:540) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpoint(CheckpointCoordinator.java:1401) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1333) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:758) at org.apache.flink.runtime.jobmaster.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:556) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-05-17 15:43:08,224 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom Source (1/1) (e65ff0e6f0ff8e14a1b53410aa8bb019) switched from RUNNING to CANCELING. 2019-05-17 15:43:08,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Map -> Sink: Unnamed (1/1) (b6eca113b5bb384dd099e20a14250dcb) switched from RUNNING to CANCELING. 2019-05-17 15:43:08,324 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom Source (1/1) (e65ff0e6f0ff8e14a1b53410aa8bb019) switched from CANCELING to CANCELED. 2019-05-17 15:43:08,345 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Map -> Sink: Unnamed (1/1) (b6eca113b5bb384dd099e20a14250dcb) switched from CANCELING to CANCELED. 2019-05-17 15:43:08,353 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Try to restart or fail the job Flink Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) if no longer possible. 2019-05-17 15:43:08,356 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) switched from state FAILING to FAILED. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285105148 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: OK, I will try to refactor the test cases which contain the failure manager. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285095516 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: OK, I will follow the steps I listed lie. Let's analyze them case by case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285093338 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: I agree. So, to see more details do you agree I remove the other settings about tolerance number. Let Travis replay all possible situations? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285064641 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: I see many waiting JobMaster print, but not very sure. I think it could be declined before this PR. It was OK is because the old tolerance mechanism exists TM end. But, now, it would trigger restart by `CheckpointFailureManager`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285061845 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: This one I can make sure, the Kafka end to end test script checked the log file is not empty and the log reported: the count is exceed the tolerance number. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285061845 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: This one I can make sure, the Kafka end to end test script checked the log file is not empty and the log reported: the count exceeds the tolerance number. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285061845 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: This one I can make sure, the Kafka end to end test script checked the log file is not empty and the log reported: the count is execeed the tolerance number. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285060806 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ## @@ -295,6 +295,7 @@ public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception { env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env.setParallelism(parallelism); env.enableCheckpointing(200); + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: This one has been reported to Jira : https://issues.apache.org/jira/browse/FLINK-12542 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285059373 ## File path: flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/test/StatefulStreamingJob.java ## @@ -150,6 +150,7 @@ public static void main(String[] args) throws Exception { env.setRestartStrategy(RestartStrategies.noRestart()); env.enableCheckpointing(1000L); env.getConfig().disableGenericTypes(); + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); Review comment: I am trying to find this case, I triggered so many Travis building. I remember three checkpoints ware declined in this case's log. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285058131 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: Yes, actually, I am not very sure about this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285050218 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: Log detail for this one: https://travis-ci.org/apache/flink/jobs/533358204 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285046388 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: I understand, my PR changed these test cases' default behavior and caused them should specify the definite tolerance number (not default). So they may trigger compatibility problem or cause more test cases failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285046388 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: I understand, my PR changed these test cases' default behavior and caused them must specify the definite tolerance number (not default). So they may trigger compatibility problem(users existed job) or cause more test cases failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285046388 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: I understand, my PR changed these test cases' default behavior and caused them must specify the definite tolerance number (not default). So they may trigger compatibility problem or cause more test cases failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285044045 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: Ok, let me find the specific old logs why it was failed and we can discuss based on it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285041637 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: I think the default value should be `0` to keep the same behavior with `failOnCheckpointingErrors`(default is `true`). I changed it to fix the test case. Because of some test case (integration test) is not stable, sometimes the checkpoint would be declined, and they do not accept job recovery by `CheckpointFailureManager` because they do not allow restart and check the log files exists the exception message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285041637 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: I think the default value should be `0` to keep the same behavior with `failOnCheckpointingErrors`(default is `true`). I changed it to fix the test case. Because of some test case (integration test) is not stable, sometimes the checkpoint would be declined, and they do not accept job recovery by `CheckpointFailureManager`, some of them do not allow restart and check the log files. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285041637 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ## @@ -187,6 +187,7 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: I think the default value should be `0` to keep the same behavior with `failOnCheckpointingErrors`(default is `true`). I changed it to fix the test case. Because of some test case (integration test) is not stable, sometimes the checkpoint would be declined, and they do not accept job recovery by `CheckpointFailureManager`, some of them do not allow restart, some of them will check the log files. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284740074 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java ## @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration( int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, - boolean isPerfetCheckpointForRecovery) { + boolean isPerfetCheckpointForRecovery, + int tolerableCpFailureNumber) { // sanity checks if (checkpointInterval < 1 || checkpointTimeout < 1 || - minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { + minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1 || + tolerableCpFailureNumber < 0 || tolerableCpFailureNumber >= Integer.MAX_VALUE) { Review comment: Sorry, it's my negligence. Have fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284720134 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber < MAXIMUM_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on +* checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure +* happens before the checkpoint id generation. In this case, it will be specified a negative +* latest generated checkpoint id as a special flag. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //ignore + break; + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case CHECKPOINT_DECLINED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + continuousFailureCounter.incrementAndGet(); + break; + + default: + throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + reason.name()); + } + + if (continuousFailureCounter.get() > tolerableCpFailureNumber) { Review comment: Accepted! Have updated the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284711957 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber < MAXIMUM_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on +* checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure +* happens before the checkpoint id generation. In this case, it will be specified a negative +* latest generated checkpoint id as a special flag. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //ignore + break; + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case CHECKPOINT_DECLINED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + continuousFailureCounter.incrementAndGet(); + break; + + default: + throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + reason.name()); + } + + if (continuousFailureCounter.get() > tolerableCpFailureNumber) { Review comment: The reason I did not use `equal` is here. So if I accept your suggestion, here should change to `continuousFailureCounter.get() > tolerableCpFailureNumber || continuousFailureCounter.get() < 0`. The `continuousFailureCounter.get() < 0` means
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284156148 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber < MAXIMUM_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on Review comment: Have added the description about the checkpoint id parameter. And Have created a issue (FLINK-12514) to track the refactor work about the counting mechanism based on ordered checkpoint id in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284141262 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber < MAXIMUM_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param checkpointId the failed checkpoint id used to count the continuous failure number based on Review comment: OK, will give more description. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284140921 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: It seems there are still two test failure issues hang the JVM process, but I cannot find a definite description, one of them is successful in my local test. travis : https://travis-ci.org/apache/flink/builds/532612594 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284102152 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: > What I meant is removing the `try-catch(CheckpointException)`around the `triggerCheckpoint` call completely and exchange the `throw`s inside the method by direct calls to the failure handler. I have accepted all the suggestion, except this one, because when I was refactoring, I found the complexity may exceed our imagination, some obstacle: 1) ``` public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { ``` This method returns a boolean value, so it needs a binary state, when we catch `CheckpointException` we return false and when we get `PendingCheckpoint` we return true. If the real triggerCheckpoint method does not throw `CheckpointException` how two distinguish binary state? 2) The triggerCheckpoint's boolean return value has been used in so many test case to judge whether a checkpoint is successful or not. 3) The `triggerSavepointInternal` method need to catch the `CheckpointException` to return a completed exceptionally for the future object, that means `triggerSavepointInternal` also needs a binary state for `CompletableFuture` 4) I have implemented `CheckpointIDCounter#get`, after providing this function. I think report failure centralized would be better than scattered code, just like we provide a unified `PendingCheckpoint#abort` before. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284102152 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: > What I meant is removing the `try-catch(CheckpointException)`around the `triggerCheckpoint` call completely and exchange the `throw`s inside the method by direct calls to the failure handler. I have accepted all the suggestion, except this one, because when I was refactoring, I found the complexity may exceed our imagination, some obstacle: 1) ``` public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { ``` This method returns a boolean value, so it needs a binary state, when we catch `CheckpointException` we return false and when we get `PendingCheckpoint` we return true. If the real triggerCheckpoint method does not throw `CheckpointException` how two distinguish binary state? 2)The triggerCheckpoint's boolean return value has been used in so many test case to judge whether a checkpoint is successful or not. 3) The `triggerSavepointInternal` method need to catch the `CheckpointException` to return a completed exceptionally for the future object, that means `triggerSavepointInternal` also needs a binary state for `CompletableFuture` 4) I have implemented `CheckpointIDCounter#get`, after providing this function. I think report failure centralized would be better than scattered code, just like we provide a unified `PendingCheckpoint#abort` before. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r284070033 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: > BTW, I have one more important additional point, which is the `numUnsuccessfulCheckpointsTriggers` in checkpoint coordinator, which absolutely sounds like something that should now be moved into the failure manager, wdyt? After we provided `CheckpointFailureManager`, IMO the `numUnsuccessfulCheckpointsTriggers` is not valuable. Currently, it is incremented in the trigger lock of method `triggerCheckpoint`. The domain of trigger lock is the subset of the trigger phase which been `CheckpointFailureManager` understood. So the counting is not correct. And it is just for logging purpose. So I suggest we could remove it in this PR or in the third step (next step). If the logging is really necessary, we could do it again after we implemented the new counting logic based on checkpoint id sequence. What's your opinion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283766909 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: Do you think we should provide a `CheckpointIDCounter#get` API? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283764177 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: > Rest of my comments still holds though, why not remove the throwing/catching of an exception that we can also directly report to the failure manager? I also think that makes other counts accurate because it obtaining the id can then always happen under the lock. This try/catch block: ``` try { long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId); } catch (Exception e1) { LOG.warn("Get latest generated checkpoint id error : ", e1); } ``` is because the `CheckpointIDCounter#getAndIncrement ` throws a checked `Exception`. I think if it can change to this code snippet then looks better right: ``` public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { try { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { long latestGeneratedCheckpointId = getCheckpointIdCounter().get(); failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId); return false; } } ``` If the reported behavior exists in the real `triggerCheckpoint` method, it would cause more code to report exception and generate new checkpoint id. In addition, there are two locks exist in the method. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283764177 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: > Rest of my comments still holds though, why not remove the throwing/catching of an exception that we can also directly report to the failure manager? I also think that makes other counts accurate because it obtaining the id can then always happen under the lock. This try/catch block: ``` try { long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId); } catch (Exception e1) { LOG.warn("Get latest generated checkpoint id error : ", e1); } ``` is because the `CheckpointIDCounter#getAndIncrement ` throws a checked `Exception`. I think if it can change to this code snippet then looks better right? ``` public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { try { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { long latestGeneratedCheckpointId = getCheckpointIdCounter().get(); failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId); return false; } } ``` If the reported behavior exists in the real `triggerCheckpoint` method, it would cause more code to report exception and generate new checkpoint id. In addition, there are two locks exist in the method. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283758545 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: > If you read my first comment, it was not about that you get the value from that counter, but I wondered why you do `getAndIncrement` instead of just `get`. Actually, I also want to use `get` API, but `CheckpointIDCounter ` does not provide `get` API. > Rest of my comments still holds though, why not remove the throwing/catching of an exception that we can also directly report to the failure manager? I also think that makes other counts accurate because it obtaining the id can then always happen under the lock. OK, agree. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283755801 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: > I thought about this but can not make sure if we can get the correct latest checkpoint id when allow concurrent checkpointing. Actually, I mean I cannot get a better way to extract the `latestGeneratedCheckpointId ` except using `CheckpointIDCounter`. I think using `pendingCheckpoints` or `recentPendingCheckpoints` to get the latest pending checkpoint may not accurate when allow concurrent checkpointing. What's your suggestion to get the value of this variable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283746719 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + try { + long latestGeneratedCheckpointId = getCheckpointIdCounter().getAndIncrement(); Review comment: I thought about this but can not make sure if we can get the correct latest checkpoint id when allow concurrent checkpointing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283265154 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final static short IGNORE_FLAG = 0; + private final static short COUNT_FLAG = 1; + private final static short SUCCEED_FLAG = -1; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final TreeMap serialCheckpointResultTable; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber < MAXIMUM_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.failureCallback = checkNotNull(failureCallback); + this.serialCheckpointResultTable = new TreeMap<>(Collections.reverseOrder()); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { Review comment: @StefanRRichter Still the old question, if I include the checkpoint id in this method(your second suggestion), what arg should be passed in [here](https://github.com/apache/flink/pull/8322/files#diff-a38ea0fa799bdaa0b354d80cd8368c60R442)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283257385 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final static short IGNORE_FLAG = 0; + private final static short COUNT_FLAG = 1; + private final static short SUCCEED_FLAG = -1; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final TreeMap serialCheckpointResultTable; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber < MAXIMUM_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.failureCallback = checkNotNull(failureCallback); + this.serialCheckpointResultTable = new TreeMap<>(Collections.reverseOrder()); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { Review comment: @StefanRRichter Thanks for your suggestion, actually, I have not checked and verified the new logic of counting. I was implementing it and found it has been blocked by the checkpoint id. About the checkpoint id, I agree with you, adding it into checkpoint exception is not a good idea. So I will roll back the current implementation to the old one and create an issue to refactor the counting mechanism. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r283090199 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int MAXIMUM_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final static short IGNORE_FLAG = 0; + private final static short COUNT_FLAG = 1; + private final static short SUCCEED_FLAG = -1; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final TreeMap serialCheckpointResultTable; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0 + && tolerableCpFailureNumber < MAXIMUM_TOLERABLE_FAILURE_NUMBER, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 and less than " + MAXIMUM_TOLERABLE_FAILURE_NUMBER + "."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.failureCallback = checkNotNull(failureCallback); + this.serialCheckpointResultTable = new TreeMap<>(Collections.reverseOrder()); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +*/ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { Review comment: Hi @StefanRRichter I have refactored the mechanism of counting based on your suggestion, it considers the checkpoint id's sequence. But when I am implementing, I meet a problem: the `CheckpointException` caused by **Trigger** phase may not get the checkpoint id. Currently, the method `triggerCheckpoint` has two results: * Gets a pending checkpoint (can get the checkpoint id) * Throws a `CheckpointException` (whether could get checkpoint id or not depends on the exception's throw-point in this method) So, I can not get the checkpoint id [here](https://github.com/apache/flink/pull/8322/files#diff-a38ea0fa799bdaa0b354d80cd8368c60R442). My thought is that we could inject the checkpoint id into the `CheckpointException`(it seems the semantic looks strange?), if we can not inject it, we can use a default value(-1). Then in `CheckpointFailureManager`, if we can not get a normal checkpoint (we get `-1` which means the checkpoint is not been generated in trigger phase), we would ignore this case. Actually, it seems this case is not the scene which we want to tolerance. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r282453955 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ## @@ -296,7 +296,8 @@ public static ExecutionGraph buildGraph( checkpointIdCounter, completedCheckpoints, rootBackend, - checkpointStatsTracker); + checkpointStatsTracker, + chkConfig.getTolerableCheckpointFailureNumber()); Review comment: Agree This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r282451226 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ## @@ -115,6 +115,8 @@ private volatile ScheduledFuture cancellerHandle; + private final CheckpointFailureManager failureManager; Review comment: Yes, introducing `CheckpointFailureManager` into `PendingCheckpoint` is to avoid scattering calls of `failureManager.handleCheckpointException(exception)` over the coordinator. I accept this suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r282443479 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final AtomicInteger continuousFailureCounter; + private final int tolerableCpFailureNumber; + + public CheckpointFailureManager(int tolerableCpFailureNumber) { + Preconditions.checkArgument(tolerableCpFailureNumber > 0, + "The tolerable checkpoint failure number must be larger than 0."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param callback the handler callback which defines the process logic. +*/ + public void handleCheckpointException(CheckpointException exception, FailureHandlerCallback callback) { + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //ignore + break; + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case CHECKPOINT_DECLINED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + continuousFailureCounter.incrementAndGet(); + break; + + default: + throw new RuntimeException("Unknown checkpoint failure reason : " + reason.name()); + } + + if (continuousFailureCounter.get() > tolerableCpFailureNumber) { + callback.process(); + } + } + + /** +* Handle checkpoint success. +*/ + public void handleCheckpointSuccess() { + continuousFailureCounter.set(0); Review comment: After thinking deeply, I agree with you. Following the checkpoint's sequence is a better choice. Accept. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r282047625 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final AtomicInteger continuousFailureCounter; + private final int tolerableCpFailureNumber; + + public CheckpointFailureManager(int tolerableCpFailureNumber) { + Preconditions.checkArgument(tolerableCpFailureNumber > 0, + "The tolerable checkpoint failure number must be larger than 0."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param callback the handler callback which defines the process logic. +*/ + public void handleCheckpointException(CheckpointException exception, FailureHandlerCallback callback) { + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //ignore + break; + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case CHECKPOINT_DECLINED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + continuousFailureCounter.incrementAndGet(); + break; + + default: + throw new RuntimeException("Unknown checkpoint failure reason : " + reason.name()); + } + + if (continuousFailureCounter.get() > tolerableCpFailureNumber) { + callback.process(); + } + } + + /** +* Handle checkpoint success. +*/ + public void handleCheckpointSuccess() { + continuousFailureCounter.set(0); Review comment: Good question. Actually, I think our focus is different. Just like I referenced the variable name `continuousFailureCounter`. I'm more concerned about “continuous” in the parallel scene. I think you are more concerned about "failure counter" based on checkpoint sequence(trigger order). To tolerate successive checkpoint failures, there are usually two purposes: * Users don't want successful checkpoints to be too far away from the current time (if they can't succeed from now on); * In many cases, continuous failures are caused by third-party system exceptions when interacting with third-party systems. If it is short-term exceptions, then we are not inclined to restart job. If it is long-term exceptions, then we are willing to restart Job. The first one is more likely to agree with you, while the second one is more likely to be related to the behavior at the time of execution. In your example, if the old checkpoint
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r281928748 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final AtomicInteger continuousFailureCounter; + private final int tolerableCpFailureNumber; + + public CheckpointFailureManager(int tolerableCpFailureNumber) { + Preconditions.checkArgument(tolerableCpFailureNumber > 0, + "The tolerable checkpoint failure number must be larger than 0."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param callback the handler callback which defines the process logic. +*/ + public void handleCheckpointException(CheckpointException exception, FailureHandlerCallback callback) { Review comment: I am sorry the change of the implementation makes you confused. The reason I introduced a callback interface because we must make the "decision" (let job fail or something else). Although we have not distinguished the `Trigger` and `Execution` phase. But there are still two different behavior: `sync`(method call in trigger phase) and `async`(message in execution phase). So I think the `handleCheckpoint` in design doc is not enough. We should track the two phases of failure (`triggerCheckpoint` and `abort`). What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r281928748 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final AtomicInteger continuousFailureCounter; + private final int tolerableCpFailureNumber; + + public CheckpointFailureManager(int tolerableCpFailureNumber) { + Preconditions.checkArgument(tolerableCpFailureNumber > 0, + "The tolerable checkpoint failure number must be larger than 0."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param callback the handler callback which defines the process logic. +*/ + public void handleCheckpointException(CheckpointException exception, FailureHandlerCallback callback) { Review comment: I am sorry the change of the implementation make you confused. The reason I introduced a callback interface because we must make the "decision" (let job fail or something else). Although we have not distinguished the `Trigger` and `Execution` phase. But there are still two different behavior: `sync`(method call in trigger phase) and `async`(message in execution phase). So I think the `handleCheckpoint` in design doc is not enough. We should track the two phases of failure. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r281927266 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final AtomicInteger continuousFailureCounter; + private final int tolerableCpFailureNumber; + + public CheckpointFailureManager(int tolerableCpFailureNumber) { + Preconditions.checkArgument(tolerableCpFailureNumber > 0, + "The tolerable checkpoint failure number must be larger than 0."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + } + + /** +* Handle checkpoint exception with a handler callback. +* +* @param exception the checkpoint exception. +* @param callback the handler callback which defines the process logic. +*/ + public void handleCheckpointException(CheckpointException exception, FailureHandlerCallback callback) { + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //ignore + break; + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case CHECKPOINT_DECLINED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + continuousFailureCounter.incrementAndGet(); + break; + + default: + throw new RuntimeException("Unknown checkpoint failure reason : " + reason.name()); + } + + if (continuousFailureCounter.get() > tolerableCpFailureNumber) { + callback.process(); + } + } + + /** +* Handle checkpoint success. +*/ + public void handleCheckpointSuccess() { + continuousFailureCounter.set(0); Review comment: The checkpoint can be executed parallelly. I think the "continuousFailureCounter" will just tract the invokable order, not checkpoint sequence number. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r281923773 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final AtomicInteger continuousFailureCounter; + private final int tolerableCpFailureNumber; + + public CheckpointFailureManager(int tolerableCpFailureNumber) { + Preconditions.checkArgument(tolerableCpFailureNumber > 0, Review comment: Agree, it's my negligence. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services