[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r560190909 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; + +public class CheckpointCleanerTest { + +@Test +public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException { Review comment: I dropped this commit alltogether as a result of the above discussion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r560170984 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ## @@ -62,8 +62,15 @@ public void cleanCheckpoint( } } } finally { -numberOfCheckpointsToClean.decrementAndGet(); -postCleanAction.run(); +try { +numberOfCheckpointsToClean.decrementAndGet(); +postCleanAction.run(); +} catch (Exception e) { +LOG.error( +"Error while cleaning up checkpoint {}", +checkpoint.getCheckpointID(), +e); Review comment: No, I don't have a concrete scenario in mind. I'm just concerned that it can arise in the future. Okay, I'll add `synchronized` in `scheduleTriggerRequest` and drop `CheckpointsCleaner` commit - it's not necessary if `postCleanAction` is safe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r560170984 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ## @@ -62,8 +62,15 @@ public void cleanCheckpoint( } } } finally { -numberOfCheckpointsToClean.decrementAndGet(); -postCleanAction.run(); +try { +numberOfCheckpointsToClean.decrementAndGet(); +postCleanAction.run(); +} catch (Exception e) { +LOG.error( +"Error while cleaning up checkpoint {}", +checkpoint.getCheckpointID(), +e); Review comment: No, I don't have a concrete scenario in mind. I'm just concerned that it can arise in the future. Okay, I'll add `synchronized` here; and drop the other commit - it's not necessary if `postCleanAction` is safe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r560087455 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ## @@ -62,8 +62,15 @@ public void cleanCheckpoint( } } } finally { -numberOfCheckpointsToClean.decrementAndGet(); -postCleanAction.run(); +try { +numberOfCheckpointsToClean.decrementAndGet(); +postCleanAction.run(); +} catch (Exception e) { +LOG.error( +"Error while cleaning up checkpoint {}", +checkpoint.getCheckpointID(), +e); Review comment: The calling thread already acquires `CheckpointCoordinator.lock` and then `checkpoint.lock`. So it will be enough to synchronize on `checkpoint.lock` in `postCleanAction` to have a deadlock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r559745155 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; + +public class CheckpointCleanerTest { + +@Test +public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException { Review comment: It tests that a failure to submit a post-cleanup callback isn't propagated. We probably need to adopt some spec-based test framework :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r559745155 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; + +public class CheckpointCleanerTest { + +@Test +public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException { Review comment: It tests that a failure to submit a post-cleanup callback doesn't propagate. We probably need to adopt some spec-based test framework :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r559747444 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1278,7 +1278,12 @@ public void run() { } void scheduleTriggerRequest() { -timer.execute(this::executeQueuedRequest); +if (isShutdown()) { Review comment: This is only an optimization because `timer` thread can be shutdown after the check. I'm concerned that adding `synchronized` might create deadlocks (if not now then in the future). See also thread [above](https://github.com/apache/flink/pull/14683#discussion_r559742642). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r559745525 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; + +public class CheckpointCleanerTest { + +@Test +public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException { +ExecutorService executor = java.util.concurrent.Executors.newSingleThreadExecutor(); +CompletedCheckpoint checkpoint = createCheckpoint(); +TestDiscardCallback discardCallback = new TestDiscardCallback(); +checkpoint.setDiscardCallback(discardCallback); +new CheckpointsCleaner().cleanCheckpoint(checkpoint, true, executor::shutdownNow, executor); +checkState(executor.awaitTermination(10, SECONDS)); +assertTrue(discardCallback.isDiscarded()); +} + +@Test +public void testTolerateFailureInPostCleanup() { +CompletedCheckpoint checkpoint = createCheckpoint(); +TestDiscardCallback discardCallback = new TestDiscardCallback(); +checkpoint.setDiscardCallback(discardCallback); +new CheckpointsCleaner() +.cleanCheckpoint( +checkpoint, +true, +() -> { +throw new RuntimeException(); Review comment: I guess it's the same discussion as [above](https://github.com/apache/flink/pull/14683#discussion_r559714220). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r559745155 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; + +public class CheckpointCleanerTest { + +@Test +public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException { Review comment: It tests that failure to submit a post-cleanup callback don't propagate. We probably need to adopt some spec-based test framework :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r559742642 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ## @@ -62,8 +62,15 @@ public void cleanCheckpoint( } } } finally { -numberOfCheckpointsToClean.decrementAndGet(); -postCleanAction.run(); +try { +numberOfCheckpointsToClean.decrementAndGet(); +postCleanAction.run(); +} catch (Exception e) { +LOG.error( +"Error while cleaning up checkpoint {}", +checkpoint.getCheckpointID(), +e); Review comment: > the contract should be that here mustn't occur any exceptions To guarantee such a contract I see only two options: 1. Wrap `timer.execute` in `CheckpointCoordinator.scheduleTriggerRequest` in a `syncrhonized` block. 2. Tolerate `RejecectionExecutionException && isShutdown` case as I proposed in jira ticket. The 1st option is deadlock-prone IMO, the 2nd requries some discussion and rework (so not a quick solution). Logging is also not perfect but to me it's the lesser evil. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures
rkhachatryan commented on a change in pull request #14683: URL: https://github.com/apache/flink/pull/14683#discussion_r559737569 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1278,7 +1278,11 @@ public void run() { } void scheduleTriggerRequest() { -timer.execute(this::executeQueuedRequest); +if (isShutdown()) { +LOG.debug("Skip scheduling trigger request because is shutting down"); +} else { +timer.execute(this::executeQueuedRequest); +} } Review comment: Added a test. I initially assumed this change would be only a hotfix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org