[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-19 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560190909



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+@Test
+public void testTolerateFailureInPostCleanupSubmit() throws 
InterruptedException {

Review comment:
   I dropped this commit alltogether as a result of the above discussion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-19 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560170984



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
 }
 }
 } finally {
-numberOfCheckpointsToClean.decrementAndGet();
-postCleanAction.run();
+try {
+numberOfCheckpointsToClean.decrementAndGet();
+postCleanAction.run();
+} catch (Exception e) {
+LOG.error(
+"Error while cleaning up checkpoint {}",
+checkpoint.getCheckpointID(),
+e);

Review comment:
   No, I don't have a concrete scenario in mind. I'm just concerned that it 
can arise in the future.
   Okay, I'll add `synchronized` in `scheduleTriggerRequest` and drop 
`CheckpointsCleaner` commit - it's not necessary if `postCleanAction` is safe.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-19 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560170984



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
 }
 }
 } finally {
-numberOfCheckpointsToClean.decrementAndGet();
-postCleanAction.run();
+try {
+numberOfCheckpointsToClean.decrementAndGet();
+postCleanAction.run();
+} catch (Exception e) {
+LOG.error(
+"Error while cleaning up checkpoint {}",
+checkpoint.getCheckpointID(),
+e);

Review comment:
   No, I don't have a concrete scenario in mind. I'm just concerned that it 
can arise in the future.
   Okay, I'll add `synchronized` here; and drop the other commit - it's not 
necessary if `postCleanAction` is safe.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-19 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560087455



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
 }
 }
 } finally {
-numberOfCheckpointsToClean.decrementAndGet();
-postCleanAction.run();
+try {
+numberOfCheckpointsToClean.decrementAndGet();
+postCleanAction.run();
+} catch (Exception e) {
+LOG.error(
+"Error while cleaning up checkpoint {}",
+checkpoint.getCheckpointID(),
+e);

Review comment:
   The calling thread already acquires `CheckpointCoordinator.lock` and 
then `checkpoint.lock`. So it will be enough to synchronize on 
`checkpoint.lock` in `postCleanAction` to have a deadlock.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-18 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559745155



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+@Test
+public void testTolerateFailureInPostCleanupSubmit() throws 
InterruptedException {

Review comment:
   It tests that a failure to submit a post-cleanup callback isn't 
propagated.
   
   We probably need to adopt some spec-based test framework :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-18 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559745155



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+@Test
+public void testTolerateFailureInPostCleanupSubmit() throws 
InterruptedException {

Review comment:
   It tests that a failure to submit a post-cleanup callback doesn't 
propagate.
   
   We probably need to adopt some spec-based test framework :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-18 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559747444



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -1278,7 +1278,12 @@ public void run() {
 }
 
 void scheduleTriggerRequest() {
-timer.execute(this::executeQueuedRequest);
+if (isShutdown()) {

Review comment:
   This is only an optimization because `timer` thread can be shutdown 
after the check.
   I'm concerned that adding `synchronized` might create deadlocks (if not now 
then in the future).
   See also thread 
[above](https://github.com/apache/flink/pull/14683#discussion_r559742642).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-18 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559745525



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+@Test
+public void testTolerateFailureInPostCleanupSubmit() throws 
InterruptedException {
+ExecutorService executor = 
java.util.concurrent.Executors.newSingleThreadExecutor();
+CompletedCheckpoint checkpoint = createCheckpoint();
+TestDiscardCallback discardCallback = new TestDiscardCallback();
+checkpoint.setDiscardCallback(discardCallback);
+new CheckpointsCleaner().cleanCheckpoint(checkpoint, true, 
executor::shutdownNow, executor);
+checkState(executor.awaitTermination(10, SECONDS));
+assertTrue(discardCallback.isDiscarded());
+}
+
+@Test
+public void testTolerateFailureInPostCleanup() {
+CompletedCheckpoint checkpoint = createCheckpoint();
+TestDiscardCallback discardCallback = new TestDiscardCallback();
+checkpoint.setDiscardCallback(discardCallback);
+new CheckpointsCleaner()
+.cleanCheckpoint(
+checkpoint,
+true,
+() -> {
+throw new RuntimeException();

Review comment:
   I guess it's the same discussion as 
[above](https://github.com/apache/flink/pull/14683#discussion_r559714220).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-18 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559745155



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+@Test
+public void testTolerateFailureInPostCleanupSubmit() throws 
InterruptedException {

Review comment:
   It tests that failure to submit a post-cleanup callback don't propagate.
   
   We probably need to adopt some spec-based test framework :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-18 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559742642



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
 }
 }
 } finally {
-numberOfCheckpointsToClean.decrementAndGet();
-postCleanAction.run();
+try {
+numberOfCheckpointsToClean.decrementAndGet();
+postCleanAction.run();
+} catch (Exception e) {
+LOG.error(
+"Error while cleaning up checkpoint {}",
+checkpoint.getCheckpointID(),
+e);

Review comment:
   > the contract should be that here mustn't occur any exceptions
   
   To guarantee such a contract I see only two options:
   1. Wrap `timer.execute` in `CheckpointCoordinator.scheduleTriggerRequest` in 
a `syncrhonized` block.
   2. Tolerate `RejecectionExecutionException && isShutdown` case as I proposed 
in jira ticket.
   
   The 1st option is deadlock-prone IMO, the 2nd requries some discussion and 
rework (so not a quick solution).
   Logging is also not perfect but to me it's the lesser evil.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

2021-01-18 Thread GitBox


rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559737569



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -1278,7 +1278,11 @@ public void run() {
 }
 
 void scheduleTriggerRequest() {
-timer.execute(this::executeQueuedRequest);
+if (isShutdown()) {
+LOG.debug("Skip scheduling trigger request because is shutting 
down");
+} else {
+timer.execute(this::executeQueuedRequest);
+}
 }

Review comment:
   Added a test. I initially assumed this change would be only a hotfix.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org