[FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph

The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended but 
its
clean up has not been done yet. Only after all Executions have been canceled, 
the
ExecutionGraph will enter the SUSPENDED state and complete the termination 
future
accordingly.

This closes #5445.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e96a248
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e96a248
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e96a248

Branch: refs/heads/master
Commit: 7e96a248587ddbf2eb0ae445eb3079a4b2e4753f
Parents: e8d6f39
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Feb 1 18:04:06 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 22 17:32:37 2018 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 46 +++++++++++-----
 .../flink/runtime/jobgraph/JobStatus.java       |  3 +
 .../ZooKeeperSubmittedJobGraphStore.java        |  2 +-
 .../handler/legacy/ExecutionGraphCache.java     | 16 ++++--
 .../ArchivedExecutionGraphTest.java             |  1 +
 .../ExecutionGraphSuspendTest.java              | 58 +++++++++++++++++---
 .../handler/legacy/ExecutionGraphCacheTest.java | 33 ++++++++---
 .../legacy/JobAccumulatorsHandlerTest.java      |  3 +-
 .../handler/legacy/JobConfigHandlerTest.java    |  3 +-
 .../handler/legacy/JobDetailsHandlerTest.java   |  3 +-
 .../rest/handler/legacy/JobPlanHandlerTest.java |  3 +-
 .../JobVertexAccumulatorsHandlerTest.java       |  3 +-
 .../legacy/JobVertexDetailsHandlerTest.java     |  3 +-
 .../utils/ArchivedJobGenerationUtils.java       |  2 +-
 14 files changed, 133 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index beb3ead..9313466 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1085,10 +1085,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
        /**
         * Suspends the current ExecutionGraph.
         *
-        * <p>The JobStatus will be directly set to SUSPENDED iff the current 
state is not a terminal
+        * <p>The JobStatus will be directly set to SUSPENDING iff the current 
state is not a terminal
         * state. All ExecutionJobVertices will be canceled and the 
onTerminalState() is executed.
         *
-        * <p>The SUSPENDED state is a local terminal state which stops the 
execution of the job but does
+        * <p>The SUSPENDING state is a local terminal state which stops the 
execution of the job but does
         * not remove the job from the HA job store so that it can be recovered 
by another JobManager.
         *
         * @param suspensionCause Cause of the suspension
@@ -1097,10 +1097,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                while (true) {
                        JobStatus currentState = state;
 
-                       if (currentState.isTerminalState()) {
+                       if (currentState.isTerminalState() || currentState == 
JobStatus.SUSPENDING) {
                                // stay in a terminal state
                                return;
-                       } else if (transitionState(currentState, 
JobStatus.SUSPENDED, suspensionCause)) {
+                       } else if (transitionState(currentState, 
JobStatus.SUSPENDING, suspensionCause)) {
                                initFailureCause(suspensionCause);
 
                                // make sure no concurrent local actions 
interfere with the cancellation
@@ -1112,16 +1112,25 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                                if (ongoingSchedulingFuture != null) {
                                        ongoingSchedulingFuture.cancel(false);
                                }
+                               final ArrayList<CompletableFuture<Void>> 
executionJobVertexTerminationFutures = new 
ArrayList<>(verticesInCreationOrder.size());
 
                                for (ExecutionJobVertex ejv: 
verticesInCreationOrder) {
-                                       ejv.cancel();
+                                       
executionJobVertexTerminationFutures.add(ejv.cancelWithFuture());
                                }
 
-                               synchronized (progressLock) {
-                                       onTerminalState(JobStatus.SUSPENDED);
+                               final ConjunctFuture<Void> 
jobVerticesTerminationFuture = 
FutureUtils.waitForAll(executionJobVertexTerminationFutures);
 
-                                       LOG.info("Job {} has been suspended.", 
getJobID());
-                               }
+                               jobVerticesTerminationFuture.whenComplete(
+                                       (Void ignored, Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       LOG.debug("Flink could 
not properly clean up resource after suspension.", throwable);
+                                               }
+
+                                               // the globalModVersion does 
not play a role because there is no way
+                                               // currently to leave the 
SUSPENDING state
+                                               allVerticesInTerminalState(-1L);
+                                               LOG.info("Job {} has been 
suspended.", getJobID());
+                                       });
 
                                return;
                        }
@@ -1144,6 +1153,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                        JobStatus current = state;
                        // stay in these states
                        if (current == JobStatus.FAILING ||
+                               current == JobStatus.SUSPENDING ||
                                current == JobStatus.SUSPENDED ||
                                current.isGloballyTerminalState()) {
                                return;
@@ -1216,7 +1226,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                                } else if (current == JobStatus.FAILED) {
                                        LOG.info("Failed job during restart. 
Aborting restart.");
                                        return;
-                               } else if (current == JobStatus.SUSPENDED) {
+                               } else if (current == JobStatus.SUSPENDING || 
current == JobStatus.SUSPENDED) {
                                        LOG.info("Suspended job during restart. 
Aborting restart.");
                                        return;
                                } else if (current != JobStatus.RESTARTING) {
@@ -1301,7 +1311,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                return null;
        }
 
-       @VisibleForTesting
+       /**
+        * Returns the termination future of this {@link ExecutionGraph}. The 
termination future
+        * is completed with the terminal {@link JobStatus} once the 
ExecutionGraph reaches this
+        * terminal state and all {@link Execution} have been terminated.
+        *
+        * @return Termination future of this {@link ExecutionGraph}.
+        */
        public CompletableFuture<JobStatus> getTerminationFuture() {
                return terminationFuture;
        }
@@ -1441,9 +1457,11 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                                }
                                // concurrent job status change, let's check 
again
                        }
-                       else if (current == JobStatus.SUSPENDED) {
-                               // we've already cleaned up when entering the 
SUSPENDED state
-                               break;
+                       else if (current == JobStatus.SUSPENDING) {
+                               if (transitionState(current, 
JobStatus.SUSPENDED)) {
+                                       onTerminalState(JobStatus.SUSPENDED);
+                                       break;
+                               }
                        }
                        else if (current.isGloballyTerminalState()) {
                                LOG.warn("Job has entered globally terminal 
state without waiting for all " +

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 4ef86bd..c04528e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -47,6 +47,9 @@ public enum JobStatus {
        /** The job is currently undergoing a reset and total restart */
        RESTARTING(TerminalState.NON_TERMINAL),
 
+       /** The job has been suspended and is currently waiting for the cleanup 
to complete */
+       SUSPENDING(TerminalState.NON_TERMINAL),
+
        /**
         * The job has been suspended which means that it has been stopped but 
not been removed from a
         * potential HA job store.

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index f31c970..a60a40d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -372,7 +372,7 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                                break;
 
                                case CONNECTION_SUSPENDED: {
-                                       LOG.warn("ZooKeeper connection 
SUSPENDED. Changes to the submitted job " +
+                                       LOG.warn("ZooKeeper connection 
SUSPENDING. Changes to the submitted job " +
                                                "graphs are not monitored 
(temporarily).");
                                }
                                break;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index 19186c4..382e87e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -91,19 +91,22 @@ public class ExecutionGraphCache implements Closeable {
 
                        if (oldEntry != null) {
                                if (currentTime < oldEntry.getTTL()) {
-                                       if 
(oldEntry.getExecutionGraphFuture().isDone() && 
!oldEntry.getExecutionGraphFuture().isCompletedExceptionally()) {
+                                       final 
CompletableFuture<AccessExecutionGraph> executionGraphFuture = 
oldEntry.getExecutionGraphFuture();
+                                       if (executionGraphFuture.isDone() && 
!executionGraphFuture.isCompletedExceptionally()) {
 
                                                // TODO: Remove once we no 
longer request the actual ExecutionGraph from the JobManager but only the 
ArchivedExecutionGraph
                                                try {
-                                                       if 
(oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED) {
-                                                               return 
oldEntry.getExecutionGraphFuture();
+                                                       final 
AccessExecutionGraph executionGraph = executionGraphFuture.get();
+                                                       if 
(executionGraph.getState() != JobStatus.SUSPENDING &&
+                                                               
executionGraph.getState() != JobStatus.SUSPENDED) {
+                                                               return 
executionGraphFuture;
                                                        }
                                                        // send a new request 
to get the ExecutionGraph from the new leader
                                                } catch (InterruptedException | 
ExecutionException e) {
                                                        throw new 
RuntimeException("Could not retrieve ExecutionGraph from the orderly completed 
future. This should never happen.", e);
                                                }
-                                       } else if 
(!oldEntry.getExecutionGraphFuture().isDone()) {
-                                               return 
oldEntry.getExecutionGraphFuture();
+                                       } else if 
(!executionGraphFuture.isDone()) {
+                                               return executionGraphFuture;
                                        }
                                        // otherwise it must be completed 
exceptionally
                                }
@@ -135,7 +138,8 @@ public class ExecutionGraphCache implements Closeable {
                                                        
newEntry.getExecutionGraphFuture().complete(executionGraph);
 
                                                        // TODO: Remove once we 
no longer request the actual ExecutionGraph from the JobManager but only the 
ArchivedExecutionGraph
-                                                       if 
(executionGraph.getState() == JobStatus.SUSPENDED) {
+                                                       if 
(executionGraph.getState() == JobStatus.SUSPENDING ||
+                                                               
executionGraph.getState() == JobStatus.SUSPENDED) {
                                                                // remove the 
entry in case of suspension --> triggers new request when accessed next time
                                                                
cachedExecutionGraphs.remove(jobId, newEntry);
                                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 8bc5170..f15dca1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -172,6 +172,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
                
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), 
archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
                
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), 
archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
                
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), 
archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDING), 
archivedGraph.getStatusTimestamp(JobStatus.SUSPENDING));
                
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), 
archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
                assertEquals(runtimeGraph.isStoppable(), 
archivedGraph.isStoppable());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 52d4c81..1b19c53 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -49,7 +49,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 public class ExecutionGraphSuspendTest extends TestLogger {
 
        /**
-        * Going into SUSPENDED out of CREATED should immediately cancel 
everything and
+        * Going into SUSPENDING out of CREATED should immediately cancel 
everything and
         * not send out RPC calls.
         */
        @Test
@@ -72,7 +72,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
        }
 
        /**
-        * Going into SUSPENDED out of DEPLOYING vertices should cancel all 
vertices once with RPC calls.
+        * Going into SUSPENDING out of DEPLOYING vertices should cancel all 
vertices once with RPC calls.
         */
        @Test
        public void testSuspendedOutOfDeploying() throws Exception {
@@ -88,15 +88,20 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
                eg.suspend(new Exception("suspend"));
 
-               assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDING, eg.getState());
+
                validateAllVerticesInState(eg, ExecutionState.CANCELING);
                validateCancelRpcCalls(gateway, parallelism);
 
+               ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
+
                ensureCannotLeaveSuspendedState(eg, gateway);
        }
 
        /**
-        * Going into SUSPENDED out of RUNNING vertices should cancel all 
vertices once with RPC calls.
+        * Going into SUSPENDING out of RUNNING vertices should cancel all 
vertices once with RPC calls.
         */
        @Test
        public void testSuspendedOutOfRunning() throws Exception {
@@ -114,15 +119,21 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
 
                eg.suspend(new Exception("suspend"));
 
-               assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDING, eg.getState());
+
                validateAllVerticesInState(eg, ExecutionState.CANCELING);
+
                validateCancelRpcCalls(gateway, parallelism);
 
+               ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
+
                ensureCannotLeaveSuspendedState(eg, gateway);
        }
 
        /**
-        * Suspending from FAILING goes to SUSPENDED and sends no additional 
RPC calls
+        * Suspending from FAILING goes to SUSPENDING and sends no additional 
RPC calls
         */
        @Test
        public void testSuspendedOutOfFailing() throws Exception {
@@ -140,10 +151,14 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
 
                // suspend
                eg.suspend(new Exception("suspend"));
-               assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDING, eg.getState());
+
+               ensureCannotLeaveSuspendingState(eg, gateway);
 
                ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
 
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
+
                ensureCannotLeaveSuspendedState(eg, gateway);
        }
 
@@ -176,7 +191,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
        }
 
        /**
-        * Suspending from CANCELING goes to SUSPENDED and sends no additional 
RPC calls. 
+        * Suspending from CANCELING goes to SUSPENDING and sends no additional 
RPC calls.
         */
        @Test
        public void testSuspendedOutOfCanceling() throws Exception {
@@ -194,10 +209,14 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
 
                // suspend
                eg.suspend(new Exception("suspend"));
-               assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDING, eg.getState());
+
+               ensureCannotLeaveSuspendingState(eg, gateway);
 
                ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
 
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
+
                ensureCannotLeaveSuspendedState(eg, gateway);
        }
 
@@ -280,6 +299,27 @@ public class ExecutionGraphSuspendTest extends TestLogger {
                }
        }
 
+       private static void ensureCannotLeaveSuspendingState(ExecutionGraph eg, 
TaskManagerGateway gateway) {
+               assertEquals(JobStatus.SUSPENDING, eg.getState());
+               reset(gateway);
+
+               eg.failGlobal(new Exception("fail"));
+               assertEquals(JobStatus.SUSPENDING, eg.getState());
+               verifyNoMoreInteractions(gateway);
+
+               eg.cancel();
+               assertEquals(JobStatus.SUSPENDING, eg.getState());
+               verifyNoMoreInteractions(gateway);
+
+               eg.suspend(new Exception("suspend again"));
+               assertEquals(JobStatus.SUSPENDING, eg.getState());
+               verifyNoMoreInteractions(gateway);
+
+               for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+                       assertEquals(0, 
ev.getCurrentExecutionAttempt().getAttemptNumber());
+               }
+       }
+
        private static void validateAllVerticesInState(ExecutionGraph eg, 
ExecutionState expected) {
                for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
                        assertEquals(expected, 
ev.getCurrentExecutionAttempt().getState());

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
index 8bdaff5..3afd9fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
@@ -248,7 +248,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
 
        /**
         * Tests that a cache entry is invalidated if the retrieved {@link 
AccessExecutionGraph} is in
-        * state {@link JobStatus#SUSPENDED}.
+        * state {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}.
         *
         * <p>This test can be removed once we no longer request the actual 
{@link ExecutionGraph} from the
         * {@link JobManager}.
@@ -259,9 +259,11 @@ public class ExecutionGraphCacheTest extends TestLogger {
                final Time timeToLive = Time.hours(1L);
                final JobID expectedJobId = new JobID();
 
+               final ArchivedExecutionGraph suspendingExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDING).build();
                final ArchivedExecutionGraph suspendedExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build();
                final ConcurrentLinkedQueue<CompletableFuture<? extends 
AccessExecutionGraph>> requestJobAnswers = new ConcurrentLinkedQueue<>();
 
+               
requestJobAnswers.offer(CompletableFuture.completedFuture(suspendingExecutionGraph));
                
requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph));
                
requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph));
 
@@ -278,17 +280,21 @@ public class ExecutionGraphCacheTest extends TestLogger {
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
                        CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
+                       assertEquals(suspendingExecutionGraph, 
executionGraphFuture.get());
+
+                       executionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+
                        assertEquals(suspendedExecutionGraph, 
executionGraphFuture.get());
 
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
+                       executionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-                       assertEquals(expectedExecutionGraph, 
executionGraphFuture2.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture.get());
                }
        }
 
        /**
         * Tests that a cache entry is invalidated if the retrieved {@link 
AccessExecutionGraph} changes its
-        * state to {@link JobStatus#SUSPENDED}.
+        * state to {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}.
         *
         * <p>This test can be removed once we no longer request the actual 
{@link ExecutionGraph} from the
         * {@link JobManager}.
@@ -299,30 +305,39 @@ public class ExecutionGraphCacheTest extends TestLogger {
                final Time timeToLive = Time.hours(1L);
                final JobID expectedJobId = new JobID();
 
+               final SuspendableAccessExecutionGraph 
toBeSuspendingExecutionGraph = new 
SuspendableAccessExecutionGraph(expectedJobId);
                final SuspendableAccessExecutionGraph 
toBeSuspendedExecutionGraph = new 
SuspendableAccessExecutionGraph(expectedJobId);
 
                final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(
                        expectedJobId,
+                       
CompletableFuture.completedFuture(toBeSuspendingExecutionGraph),
                        
CompletableFuture.completedFuture(toBeSuspendedExecutionGraph),
                        
CompletableFuture.completedFuture(expectedExecutionGraph));
 
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
                        CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
+                       assertEquals(toBeSuspendingExecutionGraph, 
executionGraphFuture.get());
+
+                       
toBeSuspendingExecutionGraph.setJobStatus(JobStatus.SUSPENDING);
+
+                       // retrieve the same job from the cache again --> this 
should return it and invalidate the cache entry
+                       executionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+
                        assertEquals(toBeSuspendedExecutionGraph, 
executionGraphFuture.get());
 
                        
toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED);
 
                        // retrieve the same job from the cache again --> this 
should return it and invalidate the cache entry
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
+                       executionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-                       assertEquals(expectedExecutionGraph, 
executionGraphFuture2.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture.get());
 
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture3 = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
+                       executionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-                       assertEquals(expectedExecutionGraph, 
executionGraphFuture3.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture.get());
 
-                       assertThat(restfulGateway.getNumRequestJobCalls(), 
Matchers.equalTo(2));
+                       assertThat(restfulGateway.getNumRequestJobCalls(), 
Matchers.equalTo(3));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
index f8122ee..00829e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -38,7 +39,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobAccumulatorsHandler.
  */
-public class JobAccumulatorsHandlerTest {
+public class JobAccumulatorsHandlerTest extends TestLogger {
 
        @Test
        public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
index d173e0f..2279cd8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
@@ -39,7 +40,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobConfigHandler.
  */
-public class JobConfigHandlerTest {
+public class JobConfigHandlerTest extends TestLogger {
 
        @Test
        public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
index 2980a08..dbfa8cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -46,7 +47,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobDetailsHandler.
  */
-public class JobDetailsHandlerTest {
+public class JobDetailsHandlerTest extends TestLogger {
 
        @Test
        public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
index 29e0819..9edaef1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -34,7 +35,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobPlanHandler.
  */
-public class JobPlanHandlerTest {
+public class JobPlanHandlerTest extends TestLogger {
 
        @Test
        public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
index 97356f4..abb22e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -40,7 +41,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobVertexAccumulatorsHandler.
  */
-public class JobVertexAccumulatorsHandlerTest {
+public class JobVertexAccumulatorsHandlerTest extends TestLogger {
 
        @Test
        public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
index 9cc294a..0c52171 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationU
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -42,7 +43,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobVertexDetailsHandler.
  */
-public class JobVertexDetailsHandlerTest {
+public class JobVertexDetailsHandlerTest extends TestLogger {
 
        @Test
        public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
index ad3a95f..92b0d8a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
@@ -135,7 +135,7 @@ public class ArchivedJobGenerationUtils {
                        .setTasks(tasks)
                        .setFailureCause(new ErrorInfo(new 
Exception("jobException"), 
originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
                        .setState(JobStatus.FINISHED)
-                       .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9, 10})
+                       .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9, 10, 11})
                        .setArchivedUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
                        .build();
        }

Reply via email to