[FLINK-8807] Fix ZookeeperCompleted checkpoint store can get stuck in infinite 
loop

Before, CompletedCheckpoint did not have proper equals()/hashCode(),
which meant that the fixpoint condition in
ZooKeeperCompletedCheckpointStore would never hold if at least on
checkpoint became unreadable.

We now compare the interesting fields of the checkpoints manually and
extended the test to properly create new CompletedCheckpoints. Before,
we were reusing the same CompletedCheckpoint instances, meaning that
Objects.equals()/hashCode() would make the test succeed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df37d7ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df37d7ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df37d7ac

Branch: refs/heads/release-1.3.3-rc1
Commit: df37d7acfea10a5ca3186f3c53294f2050758627
Parents: f69bdf2
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Mar 2 17:46:56 2018 +0100
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Mon Mar 12 18:40:27 2018 +0800

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java | 26 +++++++++
 .../ZooKeeperCompletedCheckpointStore.java      |  2 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java  | 61 ++++++++++++--------
 3 files changed, 64 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df37d7ac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 01718e5..5e7a76a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -34,7 +35,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -278,6 +281,29 @@ public class CompletedCheckpoint implements Serializable {
                return externalPointer;
        }
 
+       public static boolean checkpointsMatch(
+               Collection<CompletedCheckpoint> first,
+               Collection<CompletedCheckpoint> second) {
+
+               Set<Tuple2<Long, JobID>> firstInterestingFields =
+                       new HashSet<>();
+
+               for (CompletedCheckpoint checkpoint : first) {
+                       firstInterestingFields.add(
+                               new Tuple2<>(checkpoint.getCheckpointID(), 
checkpoint.getJobId()));
+               }
+
+               Set<Tuple2<Long, JobID>> secondInterestingFields =
+                       new HashSet<>();
+
+               for (CompletedCheckpoint checkpoint : second) {
+                       secondInterestingFields.add(
+                               new Tuple2<>(checkpoint.getCheckpointID(), 
checkpoint.getJobId()));
+               }
+
+               return firstInterestingFields.equals(secondInterestingFields);
+       }
+
        /**
         * Sets the callback for tracking when this checkpoint is discarded.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/df37d7ac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 73598e6..0cbd4fb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -199,7 +199,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        }
 
                } while (retrievedCheckpoints.size() != 
numberOfInitialCheckpoints &&
-                       
!lastTryRetrievedCheckpoints.equals(retrievedCheckpoints));
+                       
!CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, 
retrievedCheckpoints));
 
                // Clear local handles in order to prevent duplicates on
                // recovery. The local handles should reflect the state

http://git-wip-us.apache.org/repos/asf/flink/blob/df37d7ac/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index bee245c..0cf422a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -33,8 +33,10 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.ErrorListenerPathable;
 import org.apache.curator.utils.EnsurePath;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -81,31 +83,16 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
        /**
         * Tests that the completed checkpoint store can retrieve all 
checkpoints stored in ZooKeeper
         * and ignores those which cannot be retrieved via their state handles.
+        *
+        * <p>We have a timeout in case the ZooKeeper store get's into a 
deadlock/livelock situation.
         */
-       @Test
+       @Test(timeout = 50000)
        public void testCheckpointRecovery() throws Exception {
+               final JobID jobID = new JobID();
+               final long checkpoint1Id = 1L;
+               final long checkpoint2Id = 2;
                final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> checkpointsInZooKeeper = new ArrayList<>(4);
 
-               final CompletedCheckpoint completedCheckpoint1 = new 
CompletedCheckpoint(
-                       new JobID(),
-                       1L,
-                       1L,
-                       1L,
-                       new HashMap<OperatorID, OperatorState>(),
-                       null,
-                       CheckpointProperties.forStandardCheckpoint(),
-                       null, null);
-
-               final CompletedCheckpoint completedCheckpoint2 = new 
CompletedCheckpoint(
-                       new JobID(),
-                       2L,
-                       2L,
-                       2L,
-                       new HashMap<OperatorID, OperatorState>(),
-                       null,
-                       CheckpointProperties.forStandardCheckpoint(),
-                       null, null);
-
                final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
                expectedCheckpointIds.add(1L);
                expectedCheckpointIds.add(2L);
@@ -114,10 +101,36 @@ public class ZooKeeperCompletedCheckpointStoreTest 
extends TestLogger {
                
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
IOException("Test exception"));
 
                final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-               
when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1);
+               when(retrievableStateHandle1.retrieveState()).then(new 
Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocation) 
throws Throwable {
+                               return new CompletedCheckpoint(
+                                       jobID,
+                                       checkpoint1Id,
+                                       1L,
+                                       1L,
+                                       new HashMap<OperatorID, 
OperatorState>(),
+                                       null,
+                                       
CheckpointProperties.forStandardCheckpoint(),
+                                       null, null);
+                       }
+               });
 
                final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle2 = mock(RetrievableStateHandle.class);
-               
when(retrievableStateHandle2.retrieveState()).thenReturn(completedCheckpoint2);
+               when(retrievableStateHandle2.retrieveState()).then(new 
Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocation) 
throws Throwable {
+                               return new CompletedCheckpoint(
+                                       jobID,
+                                       checkpoint2Id,
+                                       1L,
+                                       1L,
+                                       new HashMap<OperatorID, 
OperatorState>(),
+                                       null,
+                                       
CheckpointProperties.forStandardCheckpoint(),
+                                       null, null);
+                       }
+               });
 
                checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, 
"/foobar1"));
                
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing1"));
@@ -183,7 +196,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
 
                // check that we return the latest retrievable checkpoint
                // this should remove the latest checkpoint because it is broken
-               assertEquals(completedCheckpoint2.getCheckpointID(), 
latestCompletedCheckpoint.getCheckpointID());
+               assertEquals(checkpoint2Id, 
latestCompletedCheckpoint.getCheckpointID());
 
                // this should remove the second broken checkpoint because 
we're iterating over all checkpoints
                List<CompletedCheckpoint> completedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();

Reply via email to