[FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended but its clean up has not been done yet. Only after all Executions have been canceled, the ExecutionGraph will enter the SUSPENDED state and complete the termination future accordingly.
This closes #5445. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e96a248 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e96a248 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e96a248 Branch: refs/heads/master Commit: 7e96a248587ddbf2eb0ae445eb3079a4b2e4753f Parents: e8d6f39 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Feb 1 18:04:06 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Feb 22 17:32:37 2018 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 46 +++++++++++----- .../flink/runtime/jobgraph/JobStatus.java | 3 + .../ZooKeeperSubmittedJobGraphStore.java | 2 +- .../handler/legacy/ExecutionGraphCache.java | 16 ++++-- .../ArchivedExecutionGraphTest.java | 1 + .../ExecutionGraphSuspendTest.java | 58 +++++++++++++++++--- .../handler/legacy/ExecutionGraphCacheTest.java | 33 ++++++++--- .../legacy/JobAccumulatorsHandlerTest.java | 3 +- .../handler/legacy/JobConfigHandlerTest.java | 3 +- .../handler/legacy/JobDetailsHandlerTest.java | 3 +- .../rest/handler/legacy/JobPlanHandlerTest.java | 3 +- .../JobVertexAccumulatorsHandlerTest.java | 3 +- .../legacy/JobVertexDetailsHandlerTest.java | 3 +- .../utils/ArchivedJobGenerationUtils.java | 2 +- 14 files changed, 133 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index beb3ead..9313466 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1085,10 +1085,10 @@ public class ExecutionGraph implements AccessExecutionGraph { /** * Suspends the current ExecutionGraph. * - * <p>The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal + * <p>The JobStatus will be directly set to SUSPENDING iff the current state is not a terminal * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed. * - * <p>The SUSPENDED state is a local terminal state which stops the execution of the job but does + * <p>The SUSPENDING state is a local terminal state which stops the execution of the job but does * not remove the job from the HA job store so that it can be recovered by another JobManager. * * @param suspensionCause Cause of the suspension @@ -1097,10 +1097,10 @@ public class ExecutionGraph implements AccessExecutionGraph { while (true) { JobStatus currentState = state; - if (currentState.isTerminalState()) { + if (currentState.isTerminalState() || currentState == JobStatus.SUSPENDING) { // stay in a terminal state return; - } else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) { + } else if (transitionState(currentState, JobStatus.SUSPENDING, suspensionCause)) { initFailureCause(suspensionCause); // make sure no concurrent local actions interfere with the cancellation @@ -1112,16 +1112,25 @@ public class ExecutionGraph implements AccessExecutionGraph { if (ongoingSchedulingFuture != null) { ongoingSchedulingFuture.cancel(false); } + final ArrayList<CompletableFuture<Void>> executionJobVertexTerminationFutures = new ArrayList<>(verticesInCreationOrder.size()); for (ExecutionJobVertex ejv: verticesInCreationOrder) { - ejv.cancel(); + executionJobVertexTerminationFutures.add(ejv.cancelWithFuture()); } - synchronized (progressLock) { - onTerminalState(JobStatus.SUSPENDED); + final ConjunctFuture<Void> jobVerticesTerminationFuture = FutureUtils.waitForAll(executionJobVertexTerminationFutures); - LOG.info("Job {} has been suspended.", getJobID()); - } + jobVerticesTerminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Flink could not properly clean up resource after suspension.", throwable); + } + + // the globalModVersion does not play a role because there is no way + // currently to leave the SUSPENDING state + allVerticesInTerminalState(-1L); + LOG.info("Job {} has been suspended.", getJobID()); + }); return; } @@ -1144,6 +1153,7 @@ public class ExecutionGraph implements AccessExecutionGraph { JobStatus current = state; // stay in these states if (current == JobStatus.FAILING || + current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED || current.isGloballyTerminalState()) { return; @@ -1216,7 +1226,7 @@ public class ExecutionGraph implements AccessExecutionGraph { } else if (current == JobStatus.FAILED) { LOG.info("Failed job during restart. Aborting restart."); return; - } else if (current == JobStatus.SUSPENDED) { + } else if (current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED) { LOG.info("Suspended job during restart. Aborting restart."); return; } else if (current != JobStatus.RESTARTING) { @@ -1301,7 +1311,13 @@ public class ExecutionGraph implements AccessExecutionGraph { return null; } - @VisibleForTesting + /** + * Returns the termination future of this {@link ExecutionGraph}. The termination future + * is completed with the terminal {@link JobStatus} once the ExecutionGraph reaches this + * terminal state and all {@link Execution} have been terminated. + * + * @return Termination future of this {@link ExecutionGraph}. + */ public CompletableFuture<JobStatus> getTerminationFuture() { return terminationFuture; } @@ -1441,9 +1457,11 @@ public class ExecutionGraph implements AccessExecutionGraph { } // concurrent job status change, let's check again } - else if (current == JobStatus.SUSPENDED) { - // we've already cleaned up when entering the SUSPENDED state - break; + else if (current == JobStatus.SUSPENDING) { + if (transitionState(current, JobStatus.SUSPENDED)) { + onTerminalState(JobStatus.SUSPENDED); + break; + } } else if (current.isGloballyTerminalState()) { LOG.warn("Job has entered globally terminal state without waiting for all " + http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index 4ef86bd..c04528e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -47,6 +47,9 @@ public enum JobStatus { /** The job is currently undergoing a reset and total restart */ RESTARTING(TerminalState.NON_TERMINAL), + /** The job has been suspended and is currently waiting for the cleanup to complete */ + SUSPENDING(TerminalState.NON_TERMINAL), + /** * The job has been suspended which means that it has been stopped but not been removed from a * potential HA job store. http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index f31c970..a60a40d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -372,7 +372,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { break; case CONNECTION_SUSPENDED: { - LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " + + LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job " + "graphs are not monitored (temporarily)."); } break; http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java index 19186c4..382e87e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java @@ -91,19 +91,22 @@ public class ExecutionGraphCache implements Closeable { if (oldEntry != null) { if (currentTime < oldEntry.getTTL()) { - if (oldEntry.getExecutionGraphFuture().isDone() && !oldEntry.getExecutionGraphFuture().isCompletedExceptionally()) { + final CompletableFuture<AccessExecutionGraph> executionGraphFuture = oldEntry.getExecutionGraphFuture(); + if (executionGraphFuture.isDone() && !executionGraphFuture.isCompletedExceptionally()) { // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph try { - if (oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED) { - return oldEntry.getExecutionGraphFuture(); + final AccessExecutionGraph executionGraph = executionGraphFuture.get(); + if (executionGraph.getState() != JobStatus.SUSPENDING && + executionGraph.getState() != JobStatus.SUSPENDED) { + return executionGraphFuture; } // send a new request to get the ExecutionGraph from the new leader } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly completed future. This should never happen.", e); } - } else if (!oldEntry.getExecutionGraphFuture().isDone()) { - return oldEntry.getExecutionGraphFuture(); + } else if (!executionGraphFuture.isDone()) { + return executionGraphFuture; } // otherwise it must be completed exceptionally } @@ -135,7 +138,8 @@ public class ExecutionGraphCache implements Closeable { newEntry.getExecutionGraphFuture().complete(executionGraph); // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph - if (executionGraph.getState() == JobStatus.SUSPENDED) { + if (executionGraph.getState() == JobStatus.SUSPENDING || + executionGraph.getState() == JobStatus.SUSPENDED) { // remove the entry in case of suspension --> triggers new request when accessed next time cachedExecutionGraphs.remove(jobId, newEntry); } http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 8bc5170..f15dca1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -172,6 +172,7 @@ public class ArchivedExecutionGraphTest extends TestLogger { assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), archivedGraph.getStatusTimestamp(JobStatus.CANCELED)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), archivedGraph.getStatusTimestamp(JobStatus.FINISHED)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), archivedGraph.getStatusTimestamp(JobStatus.RESTARTING)); + assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDING), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDING)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED)); assertEquals(runtimeGraph.isStoppable(), archivedGraph.isStoppable()); http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java index 52d4c81..1b19c53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java @@ -49,7 +49,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; public class ExecutionGraphSuspendTest extends TestLogger { /** - * Going into SUSPENDED out of CREATED should immediately cancel everything and + * Going into SUSPENDING out of CREATED should immediately cancel everything and * not send out RPC calls. */ @Test @@ -72,7 +72,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { } /** - * Going into SUSPENDED out of DEPLOYING vertices should cancel all vertices once with RPC calls. + * Going into SUSPENDING out of DEPLOYING vertices should cancel all vertices once with RPC calls. */ @Test public void testSuspendedOutOfDeploying() throws Exception { @@ -88,15 +88,20 @@ public class ExecutionGraphSuspendTest extends TestLogger { eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + validateAllVerticesInState(eg, ExecutionState.CANCELING); validateCancelRpcCalls(gateway, parallelism); + ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } /** - * Going into SUSPENDED out of RUNNING vertices should cancel all vertices once with RPC calls. + * Going into SUSPENDING out of RUNNING vertices should cancel all vertices once with RPC calls. */ @Test public void testSuspendedOutOfRunning() throws Exception { @@ -114,15 +119,21 @@ public class ExecutionGraphSuspendTest extends TestLogger { eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + validateAllVerticesInState(eg, ExecutionState.CANCELING); + validateCancelRpcCalls(gateway, parallelism); + ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } /** - * Suspending from FAILING goes to SUSPENDED and sends no additional RPC calls + * Suspending from FAILING goes to SUSPENDING and sends no additional RPC calls */ @Test public void testSuspendedOutOfFailing() throws Exception { @@ -140,10 +151,14 @@ public class ExecutionGraphSuspendTest extends TestLogger { // suspend eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + + ensureCannotLeaveSuspendingState(eg, gateway); ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } @@ -176,7 +191,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { } /** - * Suspending from CANCELING goes to SUSPENDED and sends no additional RPC calls. + * Suspending from CANCELING goes to SUSPENDING and sends no additional RPC calls. */ @Test public void testSuspendedOutOfCanceling() throws Exception { @@ -194,10 +209,14 @@ public class ExecutionGraphSuspendTest extends TestLogger { // suspend eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + + ensureCannotLeaveSuspendingState(eg, gateway); ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } @@ -280,6 +299,27 @@ public class ExecutionGraphSuspendTest extends TestLogger { } } + private static void ensureCannotLeaveSuspendingState(ExecutionGraph eg, TaskManagerGateway gateway) { + assertEquals(JobStatus.SUSPENDING, eg.getState()); + reset(gateway); + + eg.failGlobal(new Exception("fail")); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + eg.cancel(); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + eg.suspend(new Exception("suspend again")); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + assertEquals(0, ev.getCurrentExecutionAttempt().getAttemptNumber()); + } + } + private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected) { for (ExecutionVertex ev : eg.getAllExecutionVertices()) { assertEquals(expected, ev.getCurrentExecutionAttempt().getState()); http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java index 8bdaff5..3afd9fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java @@ -248,7 +248,7 @@ public class ExecutionGraphCacheTest extends TestLogger { /** * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} is in - * state {@link JobStatus#SUSPENDED}. + * state {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}. * * <p>This test can be removed once we no longer request the actual {@link ExecutionGraph} from the * {@link JobManager}. @@ -259,9 +259,11 @@ public class ExecutionGraphCacheTest extends TestLogger { final Time timeToLive = Time.hours(1L); final JobID expectedJobId = new JobID(); + final ArchivedExecutionGraph suspendingExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDING).build(); final ArchivedExecutionGraph suspendedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build(); final ConcurrentLinkedQueue<CompletableFuture<? extends AccessExecutionGraph>> requestJobAnswers = new ConcurrentLinkedQueue<>(); + requestJobAnswers.offer(CompletableFuture.completedFuture(suspendingExecutionGraph)); requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph)); requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph)); @@ -278,17 +280,21 @@ public class ExecutionGraphCacheTest extends TestLogger { try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(suspendingExecutionGraph, executionGraphFuture.get()); + + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(suspendedExecutionGraph, executionGraphFuture.get()); - CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); } } /** * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} changes its - * state to {@link JobStatus#SUSPENDED}. + * state to {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}. * * <p>This test can be removed once we no longer request the actual {@link ExecutionGraph} from the * {@link JobManager}. @@ -299,30 +305,39 @@ public class ExecutionGraphCacheTest extends TestLogger { final Time timeToLive = Time.hours(1L); final JobID expectedJobId = new JobID(); + final SuspendableAccessExecutionGraph toBeSuspendingExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId); final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId); final CountingRestfulGateway restfulGateway = createCountingRestfulGateway( expectedJobId, + CompletableFuture.completedFuture(toBeSuspendingExecutionGraph), CompletableFuture.completedFuture(toBeSuspendedExecutionGraph), CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(toBeSuspendingExecutionGraph, executionGraphFuture.get()); + + toBeSuspendingExecutionGraph.setJobStatus(JobStatus.SUSPENDING); + + // retrieve the same job from the cache again --> this should return it and invalidate the cache entry + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(toBeSuspendedExecutionGraph, executionGraphFuture.get()); toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED); // retrieve the same job from the cache again --> this should return it and invalidate the cache entry - CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - CompletableFuture<AccessExecutionGraph> executionGraphFuture3 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture3.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2)); + assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(3)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java index f8122ee..00829e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -38,7 +39,7 @@ import static org.mockito.Mockito.mock; /** * Tests for the JobAccumulatorsHandler. */ -public class JobAccumulatorsHandlerTest { +public class JobAccumulatorsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java index d173e0f..2279cd8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -39,7 +40,7 @@ import static org.mockito.Mockito.mock; /** * Tests for the JobConfigHandler. */ -public class JobConfigHandlerTest { +public class JobConfigHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java index 2980a08..dbfa8cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -46,7 +47,7 @@ import static org.mockito.Mockito.mock; /** * Tests for the JobDetailsHandler. */ -public class JobDetailsHandlerTest { +public class JobDetailsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java index 29e0819..9edaef1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; @@ -34,7 +35,7 @@ import static org.mockito.Mockito.mock; /** * Tests for the JobPlanHandler. */ -public class JobPlanHandlerTest { +public class JobPlanHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java index 97356f4..abb22e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -40,7 +41,7 @@ import static org.mockito.Mockito.mock; /** * Tests for the JobVertexAccumulatorsHandler. */ -public class JobVertexAccumulatorsHandlerTest { +public class JobVertexAccumulatorsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java index 9cc294a..0c52171 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationU import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -42,7 +43,7 @@ import static org.mockito.Mockito.mock; /** * Tests for the JobVertexDetailsHandler. */ -public class JobVertexDetailsHandlerTest { +public class JobVertexDetailsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/7e96a248/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java index ad3a95f..92b0d8a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java @@ -135,7 +135,7 @@ public class ArchivedJobGenerationUtils { .setTasks(tasks) .setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED))) .setState(JobStatus.FINISHED) - .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}) .setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) .build(); }