tillrohrmann closed pull request #6812: [BP-1.5][FLINK-9788] Fix ExecutionGraph 
inconsistency for global failures when restarting
URL: https://github.com/apache/flink/pull/6812
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..0be1ff27420 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) {
                                current == JobStatus.SUSPENDED ||
                                current.isGloballyTerminalState()) {
                                return;
-                       }
-                       else if (current == JobStatus.RESTARTING) {
-                               // we handle 'failGlobal()' while in 
'RESTARTING' as a safety net in case something
-                               // has gone wrong in 'RESTARTING' and we need 
to re-attempt the restarts
-                               initFailureCause(t);
-
-                               final long globalVersionForRestart = 
incrementGlobalModVersion();
-                               if (tryRestartOrFail(globalVersionForRestart)) {
-                                       return;
-                               }
-                       }
-                       else if (transitionState(current, JobStatus.FAILING, 
t)) {
+                       } else if (transitionState(current, JobStatus.FAILING, 
t)) {
                                initFailureCause(t);
 
                                // make sure no concurrent local or global 
actions interfere with the failover
@@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) {
                                                colGroups.add(cgroup);
                                        }
 
-                                       jv.resetForNewExecution(resetTimestamp, 
globalModVersion);
+                                       jv.resetForNewExecution(resetTimestamp, 
expectedGlobalVersion);
                                }
 
                                for (int i = 0; i < stateTimestamps.length; 
i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 9b98de78143..91510d1af54 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -57,12 +58,15 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
@@ -86,15 +90,20 @@
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.spy;
 
+/**
+ * Tests the restart behaviour of the {@link ExecutionGraph}.
+ */
 public class ExecutionGraphRestartTest extends TestLogger {
 
-       private final static int NUM_TASKS = 31;
+       private static final int NUM_TASKS = 31;
 
        private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(4);
 
@@ -113,9 +122,7 @@ public void testNoManualRestart() throws Exception {
 
                eg.getAllExecutionVertices().iterator().next().fail(new 
Exception("Test Exception"));
 
-               for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-                       vertex.getCurrentExecutionAttempt().cancelingComplete();
-               }
+               completeCanceling(eg);
 
                assertEquals(JobStatus.FAILED, eg.getState());
 
@@ -125,9 +132,19 @@ public void testNoManualRestart() throws Exception {
                assertEquals(JobStatus.FAILED, eg.getState());
        }
 
+       private void completeCanceling(ExecutionGraph eg) {
+               executeOperationForAllExecutions(eg, 
Execution::cancelingComplete);
+       }
+
+       private void executeOperationForAllExecutions(ExecutionGraph eg, 
Consumer<Execution> operation) {
+               for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+                       operation.accept(vertex.getCurrentExecutionAttempt());
+               }
+       }
+
        @Test
        public void testRestartAutomatically() throws Exception {
-               RestartStrategy restartStrategy = new 
FixedDelayRestartStrategy(1, 1000);
+               RestartStrategy restartStrategy = new 
FixedDelayRestartStrategy(1, 0L);
                Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = 
createExecutionGraph(restartStrategy);
                ExecutionGraph eg = executionGraphInstanceTuple.f0;
 
@@ -207,23 +224,21 @@ public void testFailWhileRestarting() throws Exception {
                // Kill the instance and wait for the job to restart
                instance.markDead();
 
-               Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
-
-               while (deadline.hasTimeLeft() &&
-                       executionGraph.getState() != JobStatus.RESTARTING) {
-
-                       Thread.sleep(100);
-               }
+               waitUntilJobStatus(executionGraph, JobStatus.RESTARTING, 
TestingUtils.TESTING_DURATION().toMillis());
 
                assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
-               // The restarting should not fail with an ordinary exception
-               executionGraph.failGlobal(new Exception("Test exception"));
+               // If we fail when being in RESTARTING, then we should try to 
restart again
+               final long globalModVersion = 
executionGraph.getGlobalModVersion();
+               final Exception testException = new Exception("Test exception");
+               executionGraph.failGlobal(testException);
 
+               assertNotEquals(globalModVersion, 
executionGraph.getGlobalModVersion());
                assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+               assertEquals(testException, executionGraph.getFailureCause()); 
// we should have updated the failure cause
 
                // but it should fail when sending a SuppressRestartsException
-               executionGraph.failGlobal(new SuppressRestartsException(new 
Exception("Test exception")));
+               executionGraph.failGlobal(new SuppressRestartsException(new 
Exception("Suppress restart exception")));
 
                assertEquals(JobStatus.FAILED, executionGraph.getState());
 
@@ -254,9 +269,7 @@ public void testCancelWhileFailing() throws Exception {
                assertEquals(JobStatus.CANCELLING, graph.getState());
 
                // let all tasks finish cancelling
-               for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-                       vertex.getCurrentExecutionAttempt().cancelingComplete();
-               }
+               completeCanceling(graph);
 
                assertEquals(JobStatus.CANCELED, graph.getState());
        }
@@ -267,11 +280,7 @@ public void testFailWhileCanceling() throws Exception {
                final ExecutionGraph graph = 
createExecutionGraph(restartStrategy).f0;
 
                assertEquals(JobStatus.RUNNING, graph.getState());
-
-               // switch all tasks to running
-               for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-                       vertex.getCurrentExecutionAttempt().switchToRunning();
-               }
+               switchAllTasksToRunning(graph);
 
                graph.cancel();
 
@@ -282,13 +291,15 @@ public void testFailWhileCanceling() throws Exception {
                assertEquals(JobStatus.FAILING, graph.getState());
 
                // let all tasks finish cancelling
-               for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-                       vertex.getCurrentExecutionAttempt().cancelingComplete();
-               }
+               completeCanceling(graph);
 
                assertEquals(JobStatus.FAILED, graph.getState());
        }
 
+       private void switchAllTasksToRunning(ExecutionGraph graph) {
+               executeOperationForAllExecutions(graph, 
Execution::switchToRunning);
+       }
+
        @Test
        public void testNoRestartOnSuppressException() throws Exception {
                final ExecutionGraph eg = createExecutionGraph(new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0;
@@ -299,9 +310,7 @@ public void testNoRestartOnSuppressException() throws 
Exception {
 
                assertEquals(JobStatus.FAILING, eg.getState());
 
-               for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-                       vertex.getCurrentExecutionAttempt().cancelingComplete();
-               }
+               completeCanceling(eg);
 
                eg.waitUntilTerminal();
                assertEquals(JobStatus.FAILED, eg.getState());
@@ -330,7 +339,7 @@ public void testFailingExecutionAfterRestart() throws 
Exception {
                JobVertex sender = 
ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
                JobVertex receiver = 
ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
                JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
-               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000), scheduler);
+               ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 0L), scheduler);
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
@@ -766,6 +775,68 @@ public void 
testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
                }
        }
 
+       /**
+        * Tests that the {@link ExecutionGraph} can handle concurrent failures 
while
+        * being in the RESTARTING state.
+        */
+       @Test
+       public void testConcurrentFailureWhileRestarting() throws Exception {
+               final long timeout = 5000L;
+
+               final CountDownLatch countDownLatch = new CountDownLatch(2);
+               final CountDownLatchRestartStrategy restartStrategy = new 
CountDownLatchRestartStrategy(countDownLatch);
+               final ExecutionGraph executionGraph = 
createSimpleExecutionGraph(restartStrategy, new TestingSlotProvider(ignored -> 
new CompletableFuture<>()));
+
+               executionGraph.setQueuedSchedulingAllowed(true);
+               executionGraph.scheduleForExecution();
+
+               assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
+
+               executionGraph.failGlobal(new FlinkException("Test exception"));
+
+               executor.execute(() -> {
+                       countDownLatch.countDown();
+                       try {
+                               countDownLatch.await();
+                       } catch (InterruptedException e) {
+                               ExceptionUtils.rethrow(e);
+                       }
+
+                       executionGraph.failGlobal(new 
FlinkException("Concurrent exception"));
+               });
+
+               waitUntilJobStatus(executionGraph, JobStatus.RUNNING, timeout);
+       }
+
+       private static final class CountDownLatchRestartStrategy implements 
RestartStrategy {
+
+               private final CountDownLatch countDownLatch;
+
+               private CountDownLatchRestartStrategy(CountDownLatch 
countDownLatch) {
+                       this.countDownLatch = countDownLatch;
+               }
+
+               @Override
+               public boolean canRestart() {
+                       return true;
+               }
+
+               @Override
+               public void restart(RestartCallback restarter, 
ScheduledExecutor executor) {
+                       executor.execute(() -> {
+                               countDownLatch.countDown();
+
+                               try {
+                                       countDownLatch.await();
+                               } catch (InterruptedException e) {
+                                       ExceptionUtils.rethrow(e);
+                               }
+
+                               restarter.triggerFullRecovery();
+                       });
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
@@ -846,14 +917,6 @@ public void run() {
        }
 
        private static Tuple2<ExecutionGraph, Instance> 
createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
-               return createExecutionGraph(restartStrategy, false);
-       }
-
-       private static Tuple2<ExecutionGraph, Instance> 
createSpyExecutionGraph(RestartStrategy restartStrategy) throws Exception {
-               return createExecutionGraph(restartStrategy, true);
-       }
-
-       private static Tuple2<ExecutionGraph, Instance> 
createExecutionGraph(RestartStrategy restartStrategy, boolean isSpy) throws 
Exception {
                Instance instance = ExecutionGraphTestUtils.getInstance(
                        new ActorTaskManagerGateway(
                                new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
@@ -862,15 +925,7 @@ public void run() {
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                scheduler.newInstanceAvailable(instance);
 
-               JobVertex sender = 
ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class);
-
-               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
-               ExecutionGraph eg = newExecutionGraph(restartStrategy, 
scheduler);
-               if (isSpy) {
-                       eg = spy(eg);
-               }
-               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+               ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, 
scheduler);
 
                assertEquals(JobStatus.CREATED, eg.getState());
 
@@ -879,7 +934,23 @@ public void run() {
                return new Tuple2<>(eg, instance);
        }
 
-       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy, Scheduler scheduler) throws IOException {
+       private static ExecutionGraph 
createSimpleExecutionGraph(RestartStrategy restartStrategy, SlotProvider 
slotProvider) throws IOException, JobException {
+               JobGraph jobGraph = createJobGraph(NUM_TASKS);
+
+               ExecutionGraph eg = newExecutionGraph(restartStrategy, 
slotProvider);
+               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+               return eg;
+       }
+
+       @Nonnull
+       private static JobGraph createJobGraph(int parallelism) {
+               JobVertex sender = 
ExecutionGraphTestUtils.createJobVertex("Task", parallelism, 
NoOpInvokable.class);
+
+               return new JobGraph("Pointwise job", sender);
+       }
+
+       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy, SlotProvider slotProvider) throws IOException {
                return new ExecutionGraph(
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
@@ -889,7 +960,7 @@ private static ExecutionGraph 
newExecutionGraph(RestartStrategy restartStrategy,
                        new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        restartStrategy,
-                       scheduler);
+                       slotProvider);
        }
 
        private static void restartAfterFailure(ExecutionGraph eg, 
FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException, 
TimeoutException {
@@ -922,8 +993,10 @@ private static void 
waitForAllResourcesToBeAssignedAfterAsyncRestart(ExecutionGr
 
        private static void waitForAsyncRestart(ExecutionGraph eg, 
FiniteDuration timeout) throws InterruptedException {
                Deadline deadline = timeout.fromNow();
+               long waitingTime = 10L;
                while (deadline.hasTimeLeft() && eg.getState() != 
JobStatus.RUNNING) {
-                       Thread.sleep(100);
+                       Thread.sleep(waitingTime);
+                       waitingTime = Math.min(waitingTime << 1, 100L);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 977ab9ed4fb..b80251debb8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -121,6 +121,9 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the legacy {@link TaskManager}.
+ */
 @SuppressWarnings("serial")
 public class TaskManagerTest extends TestLogger {
 
@@ -133,7 +136,7 @@
 
        private static ActorSystem system;
 
-       final static UUID leaderSessionID = UUID.randomUUID();
+       static final UUID LEADER_SESSION_ID = UUID.randomUUID();
 
        private TestingHighAvailabilityServices highAvailabilityServices;
 
@@ -215,7 +218,6 @@ protected void run() {
                                        
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                        new ArrayList<PermanentBlobKey>(), 
Collections.emptyList(), 0);
 
-
                                new Within(d) {
 
                                        @Override
@@ -261,8 +263,6 @@ else if (!(message instanceof 
TaskManagerMessages.Heartbeat)) {
                                                                
fail("Unexpected message: " + message);
                                                        }
                                                } while 
(System.currentTimeMillis() < deadline);
-
-
                                        }
                                };
                        }
@@ -283,11 +283,11 @@ public void testJobSubmissionAndCanceling() {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -404,7 +404,7 @@ protected void run() {
                                        }
                                };
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
                        }
@@ -424,11 +424,11 @@ public void testJobSubmissionAndStop() throws Exception {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -554,11 +554,11 @@ public void testGateChannelEdgeMismatch() {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -651,7 +651,7 @@ public void testRunJobWithForwardChannel() {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
                        try {
                                final JobID jid = new JobID();
 
@@ -661,8 +661,8 @@ public void testRunJobWithForwardChannel() {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               ActorRef jm = system.actorOf(Props.create(new 
SimpleLookupJobManagerCreator(leaderSessionID)));
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               ActorRef jm = system.actorOf(Props.create(new 
SimpleLookupJobManagerCreator(LEADER_SESSION_ID)));
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -793,7 +793,7 @@ public void testCancellingDependentAndStateUpdateFails() {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
                        try {
                                final JobID jid = new JobID();
 
@@ -806,12 +806,12 @@ public void testCancellingDependentAndStateUpdateFails() {
                                ActorRef jm = system.actorOf(
                                                Props.create(
                                                                new 
SimpleLookupFailingUpdateJobManagerCreator(
-                                                                               
leaderSessionID,
+                                                                       
LEADER_SESSION_ID,
                                                                                
eid2)
                                                )
                                );
 
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -911,14 +911,14 @@ protected void run() {
 
                                                        assertEquals(0, 
tasks.size());
                                                }
-                                               catch(Exception e) {
+                                               catch (Exception e) {
                                                        e.printStackTrace();
                                                        fail(e.getMessage());
                                                }
                                        }
                                };
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
                        }
@@ -943,16 +943,16 @@ public void testRemotePartitionNotFound() throws 
Exception {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
                                final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
                                // Create the JM
                                ActorRef jm = system.actorOf(Props.create(
-                                               new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+                                               new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
 
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1017,12 +1017,12 @@ protected void run() {
                                                // The task should fail after 
repeated requests
                                                
assertEquals(ExecutionState.FAILED, msg.getExecutionState());
                                                Throwable t = 
msg.getError(ClassLoader.getSystemClassLoader());
-                                               assertEquals("Thrown exception 
was not a PartitionNotFoundException: " + t.getMessage(), 
+                                               assertEquals("Thrown exception 
was not a PartitionNotFoundException: " + t.getMessage(),
                                                        
PartitionNotFoundException.class, t.getClass());
                                        }
                                };
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
                        }
@@ -1065,16 +1065,16 @@ public void testLocalPartitionNotFound() throws 
Exception {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
                                final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
                                // Create the JM
                                ActorRef jm = system.actorOf(Props.create(
-                                               new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+                                               new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
 
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1143,7 +1143,7 @@ protected void run() {
                                        }
                                };
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
                        }
@@ -1167,9 +1167,9 @@ public void testLogNotFoundHandling() throws Exception {
 
                                // Create the JM
                                ActorRef jm = system.actorOf(Props.create(
-                                       new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+                                       new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
 
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                final int dataPort = 
NetUtils.getAvailablePort();
                                Configuration config = new Configuration();
@@ -1209,7 +1209,8 @@ protected void run() {
                                TestingUtils.stopActor(taskManager);
                                TestingUtils.stopActor(jobManager);
                        }
-               }};}
+               }};
+       }
 
        // 
------------------------------------------------------------------------
        // Stack trace sample
@@ -1555,7 +1556,7 @@ public void testTerminationOnFatalError() {
         * Test that a failing schedule or update consumers call leads to the 
failing of the respective
         * task.
         *
-        * IMPORTANT: We have to make sure that the invokable's cancel method 
is called, because only
+        * <p>IMPORTANT: We have to make sure that the invokable's cancel 
method is called, because only
         * then the future is completed. We do this by not eagerly deploy 
consumer tasks and requiring
         * the invokable to fill one memory segment. The completed memory 
segment will trigger the
         * scheduling of the downstream operator since it is in pipeline mode. 
After we've filled the
@@ -1592,9 +1593,8 @@ public void testFailingScheduleOrUpdateConsumersMessage() 
throws Exception {
                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                new ArrayList<>(), Collections.emptyList(), 0);
 
-
-                       ActorRef jmActorRef = 
system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, 
leaderSessionID), "jobmanager");
-                       ActorGateway jobManager = new 
AkkaActorGateway(jmActorRef, leaderSessionID);
+                       ActorRef jmActorRef = 
system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, 
LEADER_SESSION_ID), "jobmanager");
+                       ActorGateway jobManager = new 
AkkaActorGateway(jmActorRef, LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1635,8 +1635,8 @@ public void testSubmitTaskFailure() throws Exception {
 
                try {
 
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1696,8 +1696,8 @@ public void testStopTaskFailure() throws Exception {
                try {
                        final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
 
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1764,8 +1764,8 @@ public void testStackTraceSampleFailure() throws 
Exception {
 
                try {
 
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1813,8 +1813,8 @@ public void testUpdateTaskInputPartitionsFailure() throws 
Exception {
 
                        final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
 
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1894,7 +1894,7 @@ public void handleMessage(Object message) throws 
Exception {
                                                ),
                                                self);
                        }
-                       else if(message instanceof 
TaskMessages.UpdateTaskExecutionState){
+                       else if (message instanceof 
TaskMessages.UpdateTaskExecutionState){
                                getSender().tell(true, getSelf());
                        }
                }
@@ -1958,7 +1958,7 @@ public void handleMessage(Object message) throws 
Exception{
                                TaskMessages.UpdateTaskExecutionState updateMsg 
=
                                                
(TaskMessages.UpdateTaskExecutionState) message;
 
-                               
if(validIDs.contains(updateMsg.taskExecutionState().getID())) {
+                               if 
(validIDs.contains(updateMsg.taskExecutionState().getID())) {
                                        getSender().tell(true, getSelf());
                                } else {
                                        getSender().tell(false, getSelf());
@@ -2021,7 +2021,7 @@ public SimpleLookupFailingUpdateJobManagerCreator(UUID 
leaderSessionID, Executio
 
                        validIDs = new HashSet<ExecutionAttemptID>();
 
-                       for(ExecutionAttemptID id : ids) {
+                       for (ExecutionAttemptID id : ids) {
                                this.validIDs.add(id);
                        }
                }
@@ -2049,9 +2049,9 @@ public SimplePartitionStateLookupJobManager create() 
throws Exception {
                        return new 
SimplePartitionStateLookupJobManager(leaderSessionID, testActor);
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        public static final class TestInvokableCorrect extends 
AbstractInvokable {
 
                public TestInvokableCorrect(Environment environment) {
@@ -2061,7 +2061,7 @@ public TestInvokableCorrect(Environment environment) {
                @Override
                public void invoke() {}
        }
-       
+
        public static class TestInvokableBlockingCancelable extends 
AbstractInvokable {
 
                public TestInvokableBlockingCancelable(Environment environment) 
{


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to