[FLINK-8807] Fix ZookeeperCompleted checkpoint store can get stuck in infinite loop
Before, CompletedCheckpoint did not have proper equals()/hashCode(), which meant that the fixpoint condition in ZooKeeperCompletedCheckpointStore would never hold if at least on checkpoint became unreadable. We now compare the interesting fields of the checkpoints manually and extended the test to properly create new CompletedCheckpoints. Before, we were reusing the same CompletedCheckpoint instances, meaning that Objects.equals()/hashCode() would make the test succeed. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df37d7ac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df37d7ac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df37d7ac Branch: refs/heads/release-1.3.3-rc1 Commit: df37d7acfea10a5ca3186f3c53294f2050758627 Parents: f69bdf2 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Mar 2 17:46:56 2018 +0100 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Mon Mar 12 18:40:27 2018 +0800 ---------------------------------------------------------------------- .../runtime/checkpoint/CompletedCheckpoint.java | 26 +++++++++ .../ZooKeeperCompletedCheckpointStore.java | 2 +- .../ZooKeeperCompletedCheckpointStoreTest.java | 61 ++++++++++++-------- 3 files changed, 64 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/df37d7ac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 01718e5..5e7a76a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; @@ -34,7 +35,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -278,6 +281,29 @@ public class CompletedCheckpoint implements Serializable { return externalPointer; } + public static boolean checkpointsMatch( + Collection<CompletedCheckpoint> first, + Collection<CompletedCheckpoint> second) { + + Set<Tuple2<Long, JobID>> firstInterestingFields = + new HashSet<>(); + + for (CompletedCheckpoint checkpoint : first) { + firstInterestingFields.add( + new Tuple2<>(checkpoint.getCheckpointID(), checkpoint.getJobId())); + } + + Set<Tuple2<Long, JobID>> secondInterestingFields = + new HashSet<>(); + + for (CompletedCheckpoint checkpoint : second) { + secondInterestingFields.add( + new Tuple2<>(checkpoint.getCheckpointID(), checkpoint.getJobId())); + } + + return firstInterestingFields.equals(secondInterestingFields); + } + /** * Sets the callback for tracking when this checkpoint is discarded. * http://git-wip-us.apache.org/repos/asf/flink/blob/df37d7ac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 73598e6..0cbd4fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -199,7 +199,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto } } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints && - !lastTryRetrievedCheckpoints.equals(retrievedCheckpoints)); + !CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints)); // Clear local handles in order to prevent duplicates on // recovery. The local handles should reflect the state http://git-wip-us.apache.org/repos/asf/flink/blob/df37d7ac/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index bee245c..0cf422a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -33,8 +33,10 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.ErrorListenerPathable; import org.apache.curator.utils.EnsurePath; + import org.junit.Test; import org.junit.runner.RunWith; + import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -81,31 +83,16 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { /** * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper * and ignores those which cannot be retrieved via their state handles. + * + * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation. */ - @Test + @Test(timeout = 50000) public void testCheckpointRecovery() throws Exception { + final JobID jobID = new JobID(); + final long checkpoint1Id = 1L; + final long checkpoint2Id = 2; final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4); - final CompletedCheckpoint completedCheckpoint1 = new CompletedCheckpoint( - new JobID(), - 1L, - 1L, - 1L, - new HashMap<OperatorID, OperatorState>(), - null, - CheckpointProperties.forStandardCheckpoint(), - null, null); - - final CompletedCheckpoint completedCheckpoint2 = new CompletedCheckpoint( - new JobID(), - 2L, - 2L, - 2L, - new HashMap<OperatorID, OperatorState>(), - null, - CheckpointProperties.forStandardCheckpoint(), - null, null); - final Collection<Long> expectedCheckpointIds = new HashSet<>(2); expectedCheckpointIds.add(1L); expectedCheckpointIds.add(2L); @@ -114,10 +101,36 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception")); final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class); - when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1); + when(retrievableStateHandle1.retrieveState()).then(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return new CompletedCheckpoint( + jobID, + checkpoint1Id, + 1L, + 1L, + new HashMap<OperatorID, OperatorState>(), + null, + CheckpointProperties.forStandardCheckpoint(), + null, null); + } + }); final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class); - when(retrievableStateHandle2.retrieveState()).thenReturn(completedCheckpoint2); + when(retrievableStateHandle2.retrieveState()).then(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return new CompletedCheckpoint( + jobID, + checkpoint2Id, + 1L, + 1L, + new HashMap<OperatorID, OperatorState>(), + null, + CheckpointProperties.forStandardCheckpoint(), + null, null); + } + }); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1")); checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1")); @@ -183,7 +196,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { // check that we return the latest retrievable checkpoint // this should remove the latest checkpoint because it is broken - assertEquals(completedCheckpoint2.getCheckpointID(), latestCompletedCheckpoint.getCheckpointID()); + assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID()); // this should remove the second broken checkpoint because we're iterating over all checkpoints List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();