This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 3aeeb5f [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint 3aeeb5f is described below commit 3aeeb5fede885f9efeb66fe271828b25ffd6571e Author: Gyula Fora <gyf...@apache.org> AuthorDate: Mon Sep 23 15:26:42 2019 +0200 [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint Closes #9756 --- .../checkpoint/CompletedCheckpointStore.java | 32 ++++------ .../StandaloneCompletedCheckpointStoreTest.java | 70 ++++++++++++++++++++-- 2 files changed, 78 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index 1cda131..9ed3151 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,31 +54,26 @@ public interface CompletedCheckpointStore { * added. */ default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception { - if (getAllCheckpoints().isEmpty()) { + List<CompletedCheckpoint> allCheckpoints = getAllCheckpoints(); + if (allCheckpoints.isEmpty()) { return null; } - CompletedCheckpoint candidate = getAllCheckpoints().get(getAllCheckpoints().size() - 1); - if (isPreferCheckpointForRecovery && getAllCheckpoints().size() > 1) { - List<CompletedCheckpoint> allCheckpoints; - try { - allCheckpoints = getAllCheckpoints(); - ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1); - while (listIterator.hasPrevious()) { - CompletedCheckpoint prev = listIterator.previous(); - if (!prev.getProperties().isSavepoint()) { - candidate = prev; - LOG.info("Found a completed checkpoint before the latest savepoint, will use it to recover!"); - break; - } + CompletedCheckpoint lastCompleted = allCheckpoints.get(allCheckpoints.size() - 1); + + if (isPreferCheckpointForRecovery && allCheckpoints.size() > 1 && lastCompleted.getProperties().isSavepoint()) { + ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1); + while (listIterator.hasPrevious()) { + CompletedCheckpoint prev = listIterator.previous(); + if (!prev.getProperties().isSavepoint()) { + LOG.info("Found a completed checkpoint ({}) before the latest savepoint, will use it to recover!", prev); + return prev; } - } catch (Exception e) { - LOG.error("Method getAllCheckpoints caused exception : ", e); - throw new FlinkRuntimeException(e); } + LOG.info("Did not find earlier checkpoint, using latest savepoint to recover."); } - return candidate; + return lastCompleted; } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index 6f3c60b..4ed1050 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.state.SharedStateRegistry; + import org.junit.Test; import java.io.IOException; @@ -27,6 +29,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -41,7 +45,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS @Override protected CompletedCheckpointStore createCompletedCheckpoints( - int maxNumberOfCheckpointsToRetain) throws Exception { + int maxNumberOfCheckpointsToRetain) throws Exception { return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain); } @@ -86,7 +90,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS assertTrue(checkpoint.isDiscarded()); verifyCheckpointDiscarded(taskStates); } - + /** * Tests that the checkpoint does not exist in the store when we fail to add * it into the store (i.e., there exists an exception thrown by the method). @@ -96,16 +100,16 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS final int numCheckpointsToRetain = 1; CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain); - + for (long i = 0; i <= numCheckpointsToRetain; ++i) { CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); doReturn(i).when(checkpointToAdd).getCheckpointID(); doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates(); doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume(); - + try { store.addCheckpoint(checkpointToAdd); - + // The checkpoint should be in the store if we successfully add it into the store. List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints(); assertTrue(addedCheckpoints.contains(checkpointToAdd)); @@ -116,4 +120,60 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS } } } + + @Test + public void testPreferCheckpointWithoutSavepoint() throws Exception { + StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5); + JobID jobId = new JobID(); + store.addCheckpoint(checkpoint(jobId, 1L)); + store.addCheckpoint(checkpoint(jobId, 2L)); + store.addCheckpoint(checkpoint(jobId, 3L)); + + CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true); + + assertThat(latestCheckpoint.getCheckpointID(), equalTo(3L)); + } + + @Test + public void testPreferCheckpointWithSavepoint() throws Exception { + StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5); + JobID jobId = new JobID(); + store.addCheckpoint(checkpoint(jobId, 1L)); + store.addCheckpoint(savepoint(jobId, 2L)); + store.addCheckpoint(savepoint(jobId, 3L)); + + CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true); + + assertThat(latestCheckpoint.getCheckpointID(), equalTo(1L)); + } + + @Test + public void testPreferCheckpointWithOnlySavepoint() throws Exception { + StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5); + JobID jobId = new JobID(); + store.addCheckpoint(savepoint(jobId, 1L)); + store.addCheckpoint(savepoint(jobId, 2L)); + + CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true); + + assertThat(latestCheckpoint.getCheckpointID(), equalTo(2L)); + } + + private static CompletedCheckpoint checkpoint(JobID jobId, long checkpointId) { + return new TestCompletedCheckpoint( + jobId, + checkpointId, + checkpointId, + Collections.emptyMap(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE)); + } + + private static CompletedCheckpoint savepoint(JobID jobId, long checkpointId) { + return new TestCompletedCheckpoint( + jobId, + checkpointId, + checkpointId, + Collections.emptyMap(), + CheckpointProperties.forSavepoint()); + } }