This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 3aeeb5f  [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong 
checkpoint
3aeeb5f is described below

commit 3aeeb5fede885f9efeb66fe271828b25ffd6571e
Author: Gyula Fora <gyf...@apache.org>
AuthorDate: Mon Sep 23 15:26:42 2019 +0200

    [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint
    
    Closes #9756
---
 .../checkpoint/CompletedCheckpointStore.java       | 32 ++++------
 .../StandaloneCompletedCheckpointStoreTest.java    | 70 ++++++++++++++++++++--
 2 files changed, 78 insertions(+), 24 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 1cda131..9ed3151 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,31 +54,26 @@ public interface CompletedCheckpointStore {
         * added.
         */
        default CompletedCheckpoint getLatestCheckpoint(boolean 
isPreferCheckpointForRecovery) throws Exception {
-               if (getAllCheckpoints().isEmpty()) {
+               List<CompletedCheckpoint> allCheckpoints = getAllCheckpoints();
+               if (allCheckpoints.isEmpty()) {
                        return null;
                }
 
-               CompletedCheckpoint candidate = 
getAllCheckpoints().get(getAllCheckpoints().size() - 1);
-               if (isPreferCheckpointForRecovery && getAllCheckpoints().size() 
> 1) {
-                       List<CompletedCheckpoint> allCheckpoints;
-                       try {
-                               allCheckpoints = getAllCheckpoints();
-                               ListIterator<CompletedCheckpoint> listIterator 
= allCheckpoints.listIterator(allCheckpoints.size() - 1);
-                               while (listIterator.hasPrevious()) {
-                                       CompletedCheckpoint prev = 
listIterator.previous();
-                                       if 
(!prev.getProperties().isSavepoint()) {
-                                               candidate = prev;
-                                               LOG.info("Found a completed 
checkpoint before the latest savepoint, will use it to recover!");
-                                               break;
-                                       }
+               CompletedCheckpoint lastCompleted = 
allCheckpoints.get(allCheckpoints.size() - 1);
+
+               if (isPreferCheckpointForRecovery && allCheckpoints.size() > 1 
&& lastCompleted.getProperties().isSavepoint()) {
+                       ListIterator<CompletedCheckpoint> listIterator = 
allCheckpoints.listIterator(allCheckpoints.size() - 1);
+                       while (listIterator.hasPrevious()) {
+                               CompletedCheckpoint prev = 
listIterator.previous();
+                               if (!prev.getProperties().isSavepoint()) {
+                                       LOG.info("Found a completed checkpoint 
({}) before the latest savepoint, will use it to recover!", prev);
+                                       return prev;
                                }
-                       } catch (Exception e) {
-                               LOG.error("Method getAllCheckpoints caused 
exception : ", e);
-                               throw new FlinkRuntimeException(e);
                        }
+                       LOG.info("Did not find earlier checkpoint, using latest 
savepoint to recover.");
                }
 
-               return candidate;
+               return lastCompleted;
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 6f3c60b..4ed1050 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -27,6 +29,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -41,7 +45,7 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
 
        @Override
        protected CompletedCheckpointStore createCompletedCheckpoints(
-                       int maxNumberOfCheckpointsToRetain) throws Exception {
+               int maxNumberOfCheckpointsToRetain) throws Exception {
 
                return new 
StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
        }
@@ -86,7 +90,7 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
                assertTrue(checkpoint.isDiscarded());
                verifyCheckpointDiscarded(taskStates);
        }
-       
+
        /**
         * Tests that the checkpoint does not exist in the store when we fail 
to add
         * it into the store (i.e., there exists an exception thrown by the 
method).
@@ -96,16 +100,16 @@ public class StandaloneCompletedCheckpointStoreTest 
extends CompletedCheckpointS
 
                final int numCheckpointsToRetain = 1;
                CompletedCheckpointStore store = 
createCompletedCheckpoints(numCheckpointsToRetain);
-               
+
                for (long i = 0; i <= numCheckpointsToRetain; ++i) {
                        CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
                        doReturn(i).when(checkpointToAdd).getCheckpointID();
                        
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
                        doThrow(new 
IOException()).when(checkpointToAdd).discardOnSubsume();
-                       
+
                        try {
                                store.addCheckpoint(checkpointToAdd);
-                               
+
                                // The checkpoint should be in the store if we 
successfully add it into the store.
                                List<CompletedCheckpoint> addedCheckpoints = 
store.getAllCheckpoints();
                                
assertTrue(addedCheckpoints.contains(checkpointToAdd));
@@ -116,4 +120,60 @@ public class StandaloneCompletedCheckpointStoreTest 
extends CompletedCheckpointS
                        }
                }
        }
+
+       @Test
+       public void testPreferCheckpointWithoutSavepoint() throws Exception {
+               StandaloneCompletedCheckpointStore store = new 
StandaloneCompletedCheckpointStore(5);
+               JobID jobId = new JobID();
+               store.addCheckpoint(checkpoint(jobId, 1L));
+               store.addCheckpoint(checkpoint(jobId, 2L));
+               store.addCheckpoint(checkpoint(jobId, 3L));
+
+               CompletedCheckpoint latestCheckpoint = 
store.getLatestCheckpoint(true);
+
+               assertThat(latestCheckpoint.getCheckpointID(), equalTo(3L));
+       }
+
+       @Test
+       public void testPreferCheckpointWithSavepoint() throws Exception {
+               StandaloneCompletedCheckpointStore store = new 
StandaloneCompletedCheckpointStore(5);
+               JobID jobId = new JobID();
+               store.addCheckpoint(checkpoint(jobId, 1L));
+               store.addCheckpoint(savepoint(jobId, 2L));
+               store.addCheckpoint(savepoint(jobId, 3L));
+
+               CompletedCheckpoint latestCheckpoint = 
store.getLatestCheckpoint(true);
+
+               assertThat(latestCheckpoint.getCheckpointID(), equalTo(1L));
+       }
+
+       @Test
+       public void testPreferCheckpointWithOnlySavepoint() throws Exception {
+               StandaloneCompletedCheckpointStore store = new 
StandaloneCompletedCheckpointStore(5);
+               JobID jobId = new JobID();
+               store.addCheckpoint(savepoint(jobId, 1L));
+               store.addCheckpoint(savepoint(jobId, 2L));
+
+               CompletedCheckpoint latestCheckpoint = 
store.getLatestCheckpoint(true);
+
+               assertThat(latestCheckpoint.getCheckpointID(), equalTo(2L));
+       }
+
+       private static CompletedCheckpoint checkpoint(JobID jobId, long 
checkpointId) {
+               return new TestCompletedCheckpoint(
+                       jobId,
+                       checkpointId,
+                       checkpointId,
+                       Collections.emptyMap(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE));
+       }
+
+       private static CompletedCheckpoint savepoint(JobID jobId, long 
checkpointId) {
+               return new TestCompletedCheckpoint(
+                       jobId,
+                       checkpointId,
+                       checkpointId,
+                       Collections.emptyMap(),
+                       CheckpointProperties.forSavepoint());
+       }
 }

Reply via email to