[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-25 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r338950999
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which 
is responsible for the
+ * leader election.
+ */
+public final class DefaultDispatcherRunner implements DispatcherRunner, 
LeaderContender {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDispatcherRunner.class);
+
+   private final Object lock = new Object();
+
+   private final LeaderElectionService leaderElectionService;
+
+   private final FatalErrorHandler fatalErrorHandler;
+
+   private final DispatcherLeaderProcessFactory 
dispatcherLeaderProcessFactory;
+
+   private final CompletableFuture terminationFuture;
+
+   private final CompletableFuture shutDownFuture;
+
+   private boolean running;
+
+   private DispatcherLeaderProcess dispatcherLeaderProcess;
+
+   private CompletableFuture 
previousDispatcherLeaderProcessTerminationFuture;
+
+   private DefaultDispatcherRunner(
+   LeaderElectionService leaderElectionService,
+   FatalErrorHandler fatalErrorHandler,
+   DispatcherLeaderProcessFactory 
dispatcherLeaderProcessFactory) {
+   this.leaderElectionService = leaderElectionService;
+   this.fatalErrorHandler = fatalErrorHandler;
+   this.dispatcherLeaderProcessFactory = 
dispatcherLeaderProcessFactory;
+   this.terminationFuture = new CompletableFuture<>();
+   this.shutDownFuture = new CompletableFuture<>();
+
+   this.running = true;
+   this.dispatcherLeaderProcess = 
StoppedDispatcherLeaderProcess.INSTANCE;
+   this.previousDispatcherLeaderProcessTerminationFuture = 
CompletableFuture.completedFuture(null);
+   }
+
+   @Override
+   public CompletableFuture getShutDownFuture() {
+   return shutDownFuture;
+   }
+
+   @Override
+   public CompletableFuture closeAsync() {
+   synchronized (lock) {
+   if (!running) {
+   return terminationFuture;
+   } else {
+   running = false;
+   }
+   }
+
+   stopDispatcherLeaderProcess();
+
+   FutureUtils.forward(
+   previousDispatcherLeaderProcessTerminationFuture,
+   terminationFuture);
+
+   return terminationFuture;
+   }
+
+   // ---
+   // Leader election
+   // ---
+
+   @Override
+   public void grantLeadership(UUID leaderSessionID) {
+   runActionIfRunning(() -> 
startNewDispatcherLeaderProcess(leaderSessionID));
+   }
+
+   private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
+   stopDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess = 
createNewDispatcherLeaderProcess(leaderSessionID);
+
+   final DispatcherLeaderProcess newDispatcherLeaderProcess = 
dispatcherLeaderProcess;
+   FutureUtils.assertNoException(
+   

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-25 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r338951963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerLeaderElectionLifecycleManager.java
 ##
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+
+final class DispatcherRunnerLeaderElectionLifecycleManager implements DispatcherRunner {
+   private final T dispatcherRunner;
+   private final LeaderElectionService leaderElectionService;
+
+   private DispatcherRunnerLeaderElectionLifecycleManager(T 
dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception 
{
+   this.dispatcherRunner = dispatcherRunner;
+   this.leaderElectionService = leaderElectionService;
+
+   leaderElectionService.start(dispatcherRunner);
+   }
+
+   @Override
+   public CompletableFuture getShutDownFuture() {
+   return dispatcherRunner.getShutDownFuture();
+   }
+
+   @Override
+   public CompletableFuture closeAsync() {
+   final CompletableFuture servicesTerminationFuture = 
stopServices();
+   final CompletableFuture dispatcherRunnerTerminationFuture 
= dispatcherRunner.closeAsync();
+
+   return 
FutureUtils.completeAll(Arrays.asList(servicesTerminationFuture, 
dispatcherRunnerTerminationFuture));
+   }
+
+   private CompletableFuture stopServices() {
+   Exception exception = null;
+
+   try {
+   leaderElectionService.stop();
+   } catch (Exception e) {
+   exception = e;
 
 Review comment:
   You could return from here directly. Is this for extensibility?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-25 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r338947618
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -243,18 +255,6 @@ private void stopDispatcherServices() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
 
 Review comment:
   nit: `firstOrSuppressed` isn't needed anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-25 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r338947618
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -243,18 +255,6 @@ private void stopDispatcherServices() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
 
 Review comment:
   nit: This line isn't needed anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-24 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r338587104
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 ##
 @@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SessionDispatcherLeaderProcess}.
+ */
+public class SessionDispatcherLeaderProcessTest extends TestLogger {
+
+   private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
+
+   private static ExecutorService ioExecutor;
+
+   private final UUID leaderSessionId = UUID.randomUUID();
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private TestingDispatcherServiceFactory dispatcherServiceFactory;
+
+   @BeforeClass
+   public static void setupClass() {
+   ioExecutor = Executors.newSingleThreadExecutor();
+   }
+
+   @Before
+   public void setup() {
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   jobGraphStore = TestingJobGraphStore.newBuilder().build();
+   dispatcherServiceFactory = 
TestingDispatcherServiceFactory.newBuilder().build();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (fatalErrorHandler != null) {
+   fatalErrorHandler.rethrowError();
+   fatalErrorHandler = null;
+   }
+   }
+
+   @AfterClass
+   public static void teardownClass() {
+   if (ioExecutor != null) {
+   ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, 
ioExecutor);
+   }
+   }
+
+   @Test
+   public void start_afterClose_doesNotHaveAnEffect() throws Exception {
+   final SessionDispatcherLeaderProcess dispatcherLeaderProcess = 
createDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess.close();
+   dispatcherLeaderProcess.start();
+
+   assertThat(dispatcherLeaderProcess.getState(), 
is(SessionDispatcherLeaderProcess.State.STOPPED));
+   }
+
+   @Test
+   public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
+

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-21 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r337075405
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
 ##
 @@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which 
is responsible for the
+ * leader election.
+ */
+public class DefaultDispatcherRunner implements DispatcherRunner, 
LeaderContender {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDispatcherRunner.class);
+
+   private final Object lock = new Object();
+
+   private final LeaderElectionService leaderElectionService;
+
+   private final FatalErrorHandler fatalErrorHandler;
+
+   private final DispatcherLeaderProcessFactory 
dispatcherLeaderProcessFactory;
+
+   private final CompletableFuture terminationFuture;
+
+   private final CompletableFuture shutDownFuture;
+
+   private boolean isRunning;
+
+   private DispatcherLeaderProcess dispatcherLeaderProcess;
+
+   private CompletableFuture 
previousDispatcherLeaderProcessTerminationFuture;
+
+   private CompletableFuture dispatcherGatewayFuture;
+
+   DefaultDispatcherRunner(
+   LeaderElectionService leaderElectionService,
+   FatalErrorHandler fatalErrorHandler,
+   DispatcherLeaderProcessFactory 
dispatcherLeaderProcessFactory) throws Exception {
+   this.leaderElectionService = leaderElectionService;
+   this.fatalErrorHandler = fatalErrorHandler;
+   this.dispatcherLeaderProcessFactory = 
dispatcherLeaderProcessFactory;
+   this.terminationFuture = new CompletableFuture<>();
+   this.shutDownFuture = new CompletableFuture<>();
+
+   this.isRunning = true;
+   this.dispatcherLeaderProcess = 
StoppedDispatcherLeaderProcess.INSTANCE;
+   this.previousDispatcherLeaderProcessTerminationFuture = 
CompletableFuture.completedFuture(null);
+   this.dispatcherGatewayFuture = new CompletableFuture<>();
+
+   startDispatcherRunner(leaderElectionService);
 
 Review comment:
   Is it not possible to introduce a start method on the DispatcherRunner?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-17 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r336020164
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 ##
 @@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SessionDispatcherLeaderProcess}.
+ */
+public class SessionDispatcherLeaderProcessTest extends TestLogger {
+
+   private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
+
+   private static ExecutorService ioExecutor;
+
+   private final UUID leaderSessionId = UUID.randomUUID();
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private TestingDispatcherServiceFactory dispatcherServiceFactory;
+
+   @BeforeClass
+   public static void setupClass() {
+   ioExecutor = Executors.newSingleThreadExecutor();
+   }
+
+   @Before
+   public void setup() {
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   jobGraphStore = TestingJobGraphStore.newBuilder().build();
+   dispatcherServiceFactory = 
TestingDispatcherServiceFactory.newBuilder().build();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (fatalErrorHandler != null) {
+   fatalErrorHandler.rethrowError();
+   fatalErrorHandler = null;
+   }
+   }
+
+   @AfterClass
+   public static void teardownClass() {
+   if (ioExecutor != null) {
+   ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, 
ioExecutor);
+   }
+   }
+
+   @Test
+   public void start_afterClose_doesNotHaveAnEffect() throws Exception {
+   final SessionDispatcherLeaderProcess dispatcherLeaderProcess = 
createDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess.close();
+   dispatcherLeaderProcess.start();
+
+   assertThat(dispatcherLeaderProcess.getState(), 
is(SessionDispatcherLeaderProcess.State.STOPPED));
+   }
+
+   @Test
+   public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
+

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-17 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r336014644
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
 ##
 @@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultDispatcherRunner}.
+ */
+public class DefaultDispatcherRunnerTest extends TestLogger {
+
+   private TestingLeaderElectionService testingLeaderElectionService;
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+   private TestingDispatcherLeaderProcessFactory 
testingDispatcherLeaderProcessFactory;
+
+   @Before
+   public void setup() {
+   testingLeaderElectionService = new 
TestingLeaderElectionService();
+   testingFatalErrorHandler = new TestingFatalErrorHandler();
+   testingDispatcherLeaderProcessFactory = 
TestingDispatcherLeaderProcessFactory.defaultValue();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (testingLeaderElectionService != null) {
+   testingLeaderElectionService.stop();
+   testingLeaderElectionService = null;
+   }
+
+   if (testingFatalErrorHandler != null) {
+   testingFatalErrorHandler.rethrowError();
+   testingFatalErrorHandler = null;
+   }
+   }
+
+   @Test
+   public void closeAsync_doesNotCompleteUncompletedShutDownFuture() 
throws Exception {
+   final DefaultDispatcherRunner dispatcherRunner = 
createDispatcherRunner();
+
+   final CompletableFuture terminationFuture = 
dispatcherRunner.closeAsync();
+   terminationFuture.get();
+
+   final CompletableFuture shutDownFuture = 
dispatcherRunner.getShutDownFuture();
+   assertThat(shutDownFuture.isDone(), is(false));
+   }
+
+   @Test
+   public void 
getDispatcherGateway_beforeDispatcherLeaderProcessCompletes_returnsDispatcherGateway()
 throws Exception {
+   final UUID leaderSessionId = UUID.randomUUID();
+   final TestingDispatcherGateway expectedDispatcherGateway = 
createDispatcherGateway(leaderSessionId);
+   final TestingDispatcherLeaderProcess 
testingDispatcherLeaderProcess = 
TestingDispatcherLeaderProcess.newBuilder(leaderSessionId)
+   
.setDispatcherGatewayFuture(CompletableFuture.completedFuture(expectedDispatcherGateway))
+   .build();
+
+   testingDispatcherLeaderProcessFactory = 
TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess);
+   try (final DefaultDispatcherRunner dispatcherRunner = 
createDispatcherRunner()) {
+
+   final CompletableFuture 
dispatcherGatewayFuture = dispatcherRunner.getDispatcherGateway();
+
+   assertThat(dispatcherGatewayFuture.isDone(), is(false));
+
+   

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-17 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335895994
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
+import 
org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
+import org.apache.flink.runtime.dispatcher.SingleJobJobGraphStore;
+import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
+import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
+import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.minicluster.SessionDispatcherWithUUIDFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for the {@link DefaultDispatcherRunner}.
+ */
+public class DefaultDispatcherRunnerITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDispatcherRunnerITCase.class);
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   @ClassRule
+   public static TestingRpcServiceResource rpcServiceResource = new 
TestingRpcServiceResource();
+
+   @ClassRule
+   public static BlobServerResource blobServerResource = new 
BlobServerResource();
+
+   private JobGraph jobGraph;
+
+   private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private PartialDispatcherServices partialDispatcherServices;
+
+   private DefaultDispatcherRunnerFactory dispatcherRunnerFactory;
+
+   @Before
+   public void setup() {
+   dispatcherRunnerFactory = 

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335507651
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 ##
 @@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SessionDispatcherLeaderProcess}.
+ */
+public class SessionDispatcherLeaderProcessTest extends TestLogger {
+
+   private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
+
+   private static ExecutorService ioExecutor;
+
+   private final UUID leaderSessionId = UUID.randomUUID();
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private TestingDispatcherServiceFactory dispatcherServiceFactory;
+
+   @BeforeClass
+   public static void setupClass() {
+   ioExecutor = Executors.newSingleThreadExecutor();
+   }
+
+   @Before
+   public void setup() {
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   jobGraphStore = TestingJobGraphStore.newBuilder().build();
+   dispatcherServiceFactory = 
TestingDispatcherServiceFactory.newBuilder().build();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (fatalErrorHandler != null) {
+   fatalErrorHandler.rethrowError();
+   fatalErrorHandler = null;
+   }
+   }
+
+   @AfterClass
+   public static void teardownClass() {
+   if (ioExecutor != null) {
+   ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, 
ioExecutor);
+   }
+   }
+
+   @Test
+   public void start_afterClose_doesNotHaveAnEffect() throws Exception {
+   final SessionDispatcherLeaderProcess dispatcherLeaderProcess = 
createDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess.close();
+   dispatcherLeaderProcess.start();
+
+   assertThat(dispatcherLeaderProcess.getState(), 
is(SessionDispatcherLeaderProcess.State.STOPPED));
+   }
+
+   @Test
+   public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
+

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335481458
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
 ##
 @@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import 
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
+import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.emptyArray;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the interaction between the {@link DefaultDispatcherRunner} and 
ZooKeeper.
+ */
+public class ZooKeeperDefaultDispatcherRunnerTest extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperDefaultDispatcherRunnerTest.class);
+
+   private static final Time TESTING_TIMEOUT = Time.seconds(10L);
+
+   private static final Duration VERIFICATION_TIMEOUT = 
Duration.ofSeconds(10L);
+
+   @ClassRule
+   public static ZooKeeperResource zooKeeperResource = new 
ZooKeeperResource();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @ClassRule
+   public static TestingRpcServiceResource testingRpcServiceResource = new 
TestingRpcServiceResource();
+
+   private BlobServer blobServer;
+
+   private TestingFatalErrorHandler 

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335449903
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
 ##
 @@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Process which encapsulates the job recovery logic and life cycle management 
of a
+ * {@link Dispatcher}.
+ */
+public class SessionDispatcherLeaderProcess extends  
AbstractDispatcherLeaderProcess implements JobGraphStore.JobGraphListener {
 
 Review comment:
   There is an extra space between `extends` and 
`AbstractDispatcherLeaderProcess`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335498065
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 ##
 @@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SessionDispatcherLeaderProcess}.
+ */
+public class SessionDispatcherLeaderProcessTest extends TestLogger {
+
+   private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
+
+   private static ExecutorService ioExecutor;
+
+   private final UUID leaderSessionId = UUID.randomUUID();
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private TestingDispatcherServiceFactory dispatcherServiceFactory;
+
+   @BeforeClass
+   public static void setupClass() {
+   ioExecutor = Executors.newSingleThreadExecutor();
+   }
+
+   @Before
+   public void setup() {
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   jobGraphStore = TestingJobGraphStore.newBuilder().build();
+   dispatcherServiceFactory = 
TestingDispatcherServiceFactory.newBuilder().build();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (fatalErrorHandler != null) {
+   fatalErrorHandler.rethrowError();
+   fatalErrorHandler = null;
+   }
+   }
+
+   @AfterClass
+   public static void teardownClass() {
+   if (ioExecutor != null) {
+   ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, 
ioExecutor);
+   }
+   }
+
+   @Test
+   public void start_afterClose_doesNotHaveAnEffect() throws Exception {
+   final SessionDispatcherLeaderProcess dispatcherLeaderProcess = 
createDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess.close();
+   dispatcherLeaderProcess.start();
+
+   assertThat(dispatcherLeaderProcess.getState(), 
is(SessionDispatcherLeaderProcess.State.STOPPED));
+   }
+
+   @Test
+   public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
+

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335498403
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 ##
 @@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SessionDispatcherLeaderProcess}.
+ */
+public class SessionDispatcherLeaderProcessTest extends TestLogger {
+
+   private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
+
+   private static ExecutorService ioExecutor;
+
+   private final UUID leaderSessionId = UUID.randomUUID();
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private TestingDispatcherServiceFactory dispatcherServiceFactory;
+
+   @BeforeClass
+   public static void setupClass() {
+   ioExecutor = Executors.newSingleThreadExecutor();
+   }
+
+   @Before
+   public void setup() {
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   jobGraphStore = TestingJobGraphStore.newBuilder().build();
+   dispatcherServiceFactory = 
TestingDispatcherServiceFactory.newBuilder().build();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (fatalErrorHandler != null) {
+   fatalErrorHandler.rethrowError();
+   fatalErrorHandler = null;
+   }
+   }
+
+   @AfterClass
+   public static void teardownClass() {
+   if (ioExecutor != null) {
+   ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, 
ioExecutor);
+   }
+   }
+
+   @Test
+   public void start_afterClose_doesNotHaveAnEffect() throws Exception {
+   final SessionDispatcherLeaderProcess dispatcherLeaderProcess = 
createDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess.close();
+   dispatcherLeaderProcess.start();
+
+   assertThat(dispatcherLeaderProcess.getState(), 
is(SessionDispatcherLeaderProcess.State.STOPPED));
+   }
+
+   @Test
+   public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
+

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335484914
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java
 ##
 @@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * {@link JobGraphWriter} implementation which does not allow to store
+ * {@link JobGraph}.
+ */
+public enum ThrowingJobGraphWriter implements JobGraphWriter {
+   INSTANCE;
+
+   @Override
+   public void putJobGraph(JobGraph jobGraph) {
+   throw new UnsupportedOperationException(
+   String.format("Cannot store job graphs in the %s.", 
getClass().getSimpleName()));
 
 Review comment:
   nit: The class name will anyways appear in the stacktrace. Imo `"Storing job 
graphs is not supported"` is enough for an error message.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335451362
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
 ##
 @@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Process which encapsulates the job recovery logic and life cycle management 
of a
+ * {@link Dispatcher}.
+ */
+public class SessionDispatcherLeaderProcess extends  
AbstractDispatcherLeaderProcess implements JobGraphStore.JobGraphListener {
+
+   private final DispatcherServiceFactory dispatcherFactory;
+
+   private final JobGraphStore jobGraphStore;
+
+   private final Executor ioExecutor;
+
+   private CompletableFuture onGoingRecoveryOperation = 
FutureUtils.completedVoidFuture();
+
+   private SessionDispatcherLeaderProcess(
+   UUID leaderSessionId,
+   DispatcherServiceFactory dispatcherFactory,
+   JobGraphStore jobGraphStore,
+   Executor ioExecutor,
+   FatalErrorHandler fatalErrorHandler) {
+   super(leaderSessionId, fatalErrorHandler);
+
+   this.dispatcherFactory = dispatcherFactory;
+   this.jobGraphStore = jobGraphStore;
+   this.ioExecutor = ioExecutor;
+   }
+
+   @Override
+   protected void onStart() {
+   startServices();
+
+   onGoingRecoveryOperation = recoverJobsAsync()
+   .thenAccept(this::createDispatcherIfRunning)
+   .handle(this::onErrorIfRunning);
+   }
+
+   private void startServices() {
+   try {
+   jobGraphStore.start(this);
+   } catch (Exception e) {
+   throw new FlinkRuntimeException(
+   String.format(
+   "Could not start %s when trying to 
start the %s.",
+   
jobGraphStore.getClass().getSimpleName(),
+   getClass().getSimpleName()),
+   e);
+   }
+   }
+
+   private void createDispatcherIfRunning(Collection jobGraphs) {
+   runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
+   }
+
+   private void createDispatcher(Collection jobGraphs) {
+
+   final DispatcherService dispatcherService = 
dispatcherFactory.create(
+   DispatcherId.fromUuid(getLeaderSessionId()),
+   jobGraphs,
+   jobGraphStore);
+
+   completeDispatcherSetup(dispatcherService);
+   }
+
+   private CompletableFuture> recoverJobsAsync() {
+   return CompletableFuture.supplyAsync(
+   this::recoverJobsIfRunning,
+   ioExecutor);
+   }
+
+   private Collection recoverJobsIfRunning() {
+  

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335495000
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
 ##
 @@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultDispatcherRunner}.
+ */
+public class DefaultDispatcherRunnerTest extends TestLogger {
+
+   private TestingLeaderElectionService testingLeaderElectionService;
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+   private TestingDispatcherLeaderProcessFactory 
testingDispatcherLeaderProcessFactory;
+
+   @Before
+   public void setup() {
+   testingLeaderElectionService = new 
TestingLeaderElectionService();
+   testingFatalErrorHandler = new TestingFatalErrorHandler();
+   testingDispatcherLeaderProcessFactory = 
TestingDispatcherLeaderProcessFactory.defaultValue();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (testingLeaderElectionService != null) {
+   testingLeaderElectionService.stop();
+   testingLeaderElectionService = null;
+   }
+
+   if (testingFatalErrorHandler != null) {
+   testingFatalErrorHandler.rethrowError();
+   testingFatalErrorHandler = null;
+   }
+   }
+
+   @Test
+   public void closeAsync_doesNotCompleteUncompletedShutDownFuture() 
throws Exception {
+   final DefaultDispatcherRunner dispatcherRunner = 
createDispatcherRunner();
+
+   final CompletableFuture terminationFuture = 
dispatcherRunner.closeAsync();
+   terminationFuture.get();
+
+   final CompletableFuture shutDownFuture = 
dispatcherRunner.getShutDownFuture();
+   assertThat(shutDownFuture.isDone(), is(false));
+   }
+
+   @Test
+   public void 
getDispatcherGateway_beforeDispatcherLeaderProcessCompletes_returnsDispatcherGateway()
 throws Exception {
+   final UUID leaderSessionId = UUID.randomUUID();
+   final TestingDispatcherGateway expectedDispatcherGateway = 
createDispatcherGateway(leaderSessionId);
+   final TestingDispatcherLeaderProcess 
testingDispatcherLeaderProcess = 
TestingDispatcherLeaderProcess.newBuilder(leaderSessionId)
+   
.setDispatcherGatewayFuture(CompletableFuture.completedFuture(expectedDispatcherGateway))
+   .build();
+
+   testingDispatcherLeaderProcessFactory = 
TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess);
+   try (final DefaultDispatcherRunner dispatcherRunner = 
createDispatcherRunner()) {
+
+   final CompletableFuture 
dispatcherGatewayFuture = dispatcherRunner.getDispatcherGateway();
+
+   assertThat(dispatcherGatewayFuture.isDone(), is(false));
+
+   

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335499341
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 ##
 @@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SessionDispatcherLeaderProcess}.
+ */
+public class SessionDispatcherLeaderProcessTest extends TestLogger {
+
+   private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
+
+   private static ExecutorService ioExecutor;
+
+   private final UUID leaderSessionId = UUID.randomUUID();
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private TestingDispatcherServiceFactory dispatcherServiceFactory;
+
+   @BeforeClass
+   public static void setupClass() {
+   ioExecutor = Executors.newSingleThreadExecutor();
+   }
+
+   @Before
+   public void setup() {
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   jobGraphStore = TestingJobGraphStore.newBuilder().build();
+   dispatcherServiceFactory = 
TestingDispatcherServiceFactory.newBuilder().build();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (fatalErrorHandler != null) {
+   fatalErrorHandler.rethrowError();
+   fatalErrorHandler = null;
+   }
+   }
+
+   @AfterClass
+   public static void teardownClass() {
+   if (ioExecutor != null) {
+   ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, 
ioExecutor);
+   }
+   }
+
+   @Test
+   public void start_afterClose_doesNotHaveAnEffect() throws Exception {
+   final SessionDispatcherLeaderProcess dispatcherLeaderProcess = 
createDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess.close();
+   dispatcherLeaderProcess.start();
+
+   assertThat(dispatcherLeaderProcess.getState(), 
is(SessionDispatcherLeaderProcess.State.STOPPED));
+   }
+
+   @Test
+   public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
+

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335493271
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
 ##
 @@ -130,6 +135,8 @@ public DispatcherRunner getDispatcherRunner() {
}
 
private CompletableFuture closeAsyncInternal() {
+   LOG.info("Closing components of {}.", 
getClass().getSimpleName());
 
 Review comment:
   I find it unnecessary to log the class name explicitly. There are a couple 
occurrences of this pattern in this PR. The logger name already is the class 
name. Moreover, the class name of the caller can be [configured as a 
pattern](http://logback.qos.ch/manual/layouts.html#class).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335509782
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 ##
 @@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SessionDispatcherLeaderProcess}.
+ */
+public class SessionDispatcherLeaderProcessTest extends TestLogger {
+
+   private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
+
+   private static ExecutorService ioExecutor;
+
+   private final UUID leaderSessionId = UUID.randomUUID();
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private TestingDispatcherServiceFactory dispatcherServiceFactory;
+
+   @BeforeClass
+   public static void setupClass() {
+   ioExecutor = Executors.newSingleThreadExecutor();
+   }
+
+   @Before
+   public void setup() {
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   jobGraphStore = TestingJobGraphStore.newBuilder().build();
+   dispatcherServiceFactory = 
TestingDispatcherServiceFactory.newBuilder().build();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (fatalErrorHandler != null) {
+   fatalErrorHandler.rethrowError();
+   fatalErrorHandler = null;
+   }
+   }
+
+   @AfterClass
+   public static void teardownClass() {
+   if (ioExecutor != null) {
+   ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, 
ioExecutor);
+   }
+   }
+
+   @Test
+   public void start_afterClose_doesNotHaveAnEffect() throws Exception {
+   final SessionDispatcherLeaderProcess dispatcherLeaderProcess = 
createDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess.close();
+   dispatcherLeaderProcess.start();
+
+   assertThat(dispatcherLeaderProcess.getState(), 
is(SessionDispatcherLeaderProcess.State.STOPPED));
+   }
+
+   @Test
+   public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
+

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-16 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335503334
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 ##
 @@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SessionDispatcherLeaderProcess}.
+ */
+public class SessionDispatcherLeaderProcessTest extends TestLogger {
+
+   private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
+
+   private static ExecutorService ioExecutor;
+
+   private final UUID leaderSessionId = UUID.randomUUID();
+
+   private TestingFatalErrorHandler fatalErrorHandler;
+
+   private JobGraphStore jobGraphStore;
+
+   private TestingDispatcherServiceFactory dispatcherServiceFactory;
+
+   @BeforeClass
+   public static void setupClass() {
+   ioExecutor = Executors.newSingleThreadExecutor();
+   }
+
+   @Before
+   public void setup() {
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   jobGraphStore = TestingJobGraphStore.newBuilder().build();
+   dispatcherServiceFactory = 
TestingDispatcherServiceFactory.newBuilder().build();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (fatalErrorHandler != null) {
+   fatalErrorHandler.rethrowError();
+   fatalErrorHandler = null;
+   }
+   }
+
+   @AfterClass
+   public static void teardownClass() {
+   if (ioExecutor != null) {
+   ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, 
ioExecutor);
+   }
+   }
+
+   @Test
+   public void start_afterClose_doesNotHaveAnEffect() throws Exception {
+   final SessionDispatcherLeaderProcess dispatcherLeaderProcess = 
createDispatcherLeaderProcess();
+
+   dispatcherLeaderProcess.close();
+   dispatcherLeaderProcess.start();
+
+   assertThat(dispatcherLeaderProcess.getState(), 
is(SessionDispatcherLeaderProcess.State.STOPPED));
+   }
+
+   @Test
+   public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
+

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335093669
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StoppedDispatcherLeaderProcess.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link DispatcherLeaderProcess} implementation which is stopped. This class
+ * is useful as the initial state of the {@link DefaultDispatcherRunner}.
+ */
+public enum StoppedDispatcherLeaderProcess implements DispatcherLeaderProcess {
+   INSTANCE;
+
+   private static final CompletableFuture TERMINATION_FUTURE = 
CompletableFuture.completedFuture(null);
+   private static final UUID LEADER_SESSION_ID = new UUID(0L, 0L);
+   private static final CompletableFuture 
NEVER_COMPLETED_LEADER_SESSION_FUTURE = new CompletableFuture<>();
+   private static final CompletableFuture 
NEVER_COMPLETED_SHUTDOWN_FUTURE = new CompletableFuture<>();
+
+   @Override
+   public void start() {
+
+   }
+
+   @Override
+   public UUID getLeaderSessionId() {
+   return LEADER_SESSION_ID;
+   }
+
+   @Override
+   public CompletableFuture getDispatcherGateway() {
+   return null;
+   }
+
+   @Override
+   public CompletableFuture getConfirmLeaderSessionFuture() {
+   return NEVER_COMPLETED_LEADER_SESSION_FUTURE;
 
 Review comment:
   > Should this method be changed to throw `IllegalStateException`? Afaik 
`getConfirmLeaderSessionFuture()` shouldn't be called on 
`StoppedDispatcherLeaderProcess`
   
   Ok, I might be wrong about this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335093669
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StoppedDispatcherLeaderProcess.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link DispatcherLeaderProcess} implementation which is stopped. This class
+ * is useful as the initial state of the {@link DefaultDispatcherRunner}.
+ */
+public enum StoppedDispatcherLeaderProcess implements DispatcherLeaderProcess {
+   INSTANCE;
+
+   private static final CompletableFuture TERMINATION_FUTURE = 
CompletableFuture.completedFuture(null);
+   private static final UUID LEADER_SESSION_ID = new UUID(0L, 0L);
+   private static final CompletableFuture 
NEVER_COMPLETED_LEADER_SESSION_FUTURE = new CompletableFuture<>();
+   private static final CompletableFuture 
NEVER_COMPLETED_SHUTDOWN_FUTURE = new CompletableFuture<>();
+
+   @Override
+   public void start() {
+
+   }
+
+   @Override
+   public UUID getLeaderSessionId() {
+   return LEADER_SESSION_ID;
+   }
+
+   @Override
+   public CompletableFuture getDispatcherGateway() {
+   return null;
+   }
+
+   @Override
+   public CompletableFuture getConfirmLeaderSessionFuture() {
+   return NEVER_COMPLETED_LEADER_SESSION_FUTURE;
 
 Review comment:
   > Should this method be changed to throw `IllegalStateException`? Afaik 
`getConfirmLeaderSessionFuture()` shouldn't be called on 
`StoppedDispatcherLeaderProcess`
   
   Ok, I might be wrong about this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r334991361
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunner.java
 ##
 @@ -30,11 +31,12 @@
 public interface DispatcherRunner extends AutoCloseableAsync {
 
/**
-* Get the currently running {@link Dispatcher}.
+* Return a future which is completed once the {@link Dispatcher} gains
+* leadership.
 *
-* @return the currently running dispatcher
+* @return Future which is completed with the leader's gateway
 */
-   Dispatcher getDispatcher();
+   CompletableFuture getDispatcherGateway();
 
 Review comment:
   This is only used in tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r334513335
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StoppedDispatcherLeaderProcess.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link DispatcherLeaderProcess} implementation which is stopped. This class
+ * is useful as the initial state of the {@link DefaultDispatcherRunner}.
+ */
+public enum StoppedDispatcherLeaderProcess implements DispatcherLeaderProcess {
+   INSTANCE;
+
+   private static final CompletableFuture TERMINATION_FUTURE = 
CompletableFuture.completedFuture(null);
+   private static final UUID LEADER_SESSION_ID = new UUID(0L, 0L);
+   private static final CompletableFuture 
NEVER_COMPLETED_LEADER_SESSION_FUTURE = new CompletableFuture<>();
+   private static final CompletableFuture 
NEVER_COMPLETED_SHUTDOWN_FUTURE = new CompletableFuture<>();
+
+   @Override
+   public void start() {
+
+   }
+
+   @Override
+   public UUID getLeaderSessionId() {
+   return LEADER_SESSION_ID;
+   }
+
+   @Override
+   public CompletableFuture getDispatcherGateway() {
+   return null;
+   }
+
+   @Override
+   public CompletableFuture getConfirmLeaderSessionFuture() {
+   return NEVER_COMPLETED_LEADER_SESSION_FUTURE;
 
 Review comment:
   Should this method be changed to throw `IllegalStateException`? Afaik 
`getConfirmLeaderSessionFuture()` shouldn't be called on 
`StoppedDispatcherLeaderProcess`. Same with `getShutDownFuture()`
   
   Also, strictly speaking the state of this class is mutable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r334866031
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
 ##
 @@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which 
is responsible for the
+ * leader election.
+ */
+public class DefaultDispatcherRunner implements DispatcherRunner, 
LeaderContender {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDispatcherRunner.class);
+
+   private final Object lock = new Object();
+
+   private final LeaderElectionService leaderElectionService;
+
+   private final FatalErrorHandler fatalErrorHandler;
+
+   private final DispatcherLeaderProcessFactory 
dispatcherLeaderProcessFactory;
+
+   private final CompletableFuture terminationFuture;
+
+   private final CompletableFuture shutDownFuture;
+
+   private boolean isRunning;
+
+   private DispatcherLeaderProcess dispatcherLeaderProcess;
+
+   private CompletableFuture 
previousDispatcherLeaderProcessTerminationFuture;
+
+   private CompletableFuture dispatcherGatewayFuture;
+
+   DefaultDispatcherRunner(
+   LeaderElectionService leaderElectionService,
+   FatalErrorHandler fatalErrorHandler,
+   DispatcherLeaderProcessFactory 
dispatcherLeaderProcessFactory) throws Exception {
+   this.leaderElectionService = leaderElectionService;
+   this.fatalErrorHandler = fatalErrorHandler;
+   this.dispatcherLeaderProcessFactory = 
dispatcherLeaderProcessFactory;
+   this.terminationFuture = new CompletableFuture<>();
+   this.shutDownFuture = new CompletableFuture<>();
+
+   this.isRunning = true;
+   this.dispatcherLeaderProcess = 
StoppedDispatcherLeaderProcess.INSTANCE;
+   this.previousDispatcherLeaderProcessTerminationFuture = 
CompletableFuture.completedFuture(null);
+   this.dispatcherGatewayFuture = new CompletableFuture<>();
+
+   startDispatcherRunner(leaderElectionService);
 
 Review comment:
   This is not safe with respect to memory visiblity: 
https://stackoverflow.com/questions/2513597/what-is-an-incompletely-constructed-object
   
   We are leaking an instance of `this` before the object is constructed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335019838
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java
 ##
 @@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+abstract class AbstractDispatcherLeaderProcess implements 
DispatcherLeaderProcess {
+
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   private final Object lock = new Object();
+
+   private final UUID leaderSessionId;
+
+   private final FatalErrorHandler fatalErrorHandler;
+
+   private final CompletableFuture 
dispatcherGatewayFuture;
+
+   private final CompletableFuture confirmLeaderSessionFuture;
+
+   private final CompletableFuture terminationFuture;
+
+   private final CompletableFuture shutDownFuture;
+
+   private State state;
+
+   @Nullable
+   private DispatcherService dispatcherService;
+
+   AbstractDispatcherLeaderProcess(UUID leaderSessionId, FatalErrorHandler 
fatalErrorHandler) {
+   this.leaderSessionId = leaderSessionId;
+   this.fatalErrorHandler = fatalErrorHandler;
+
+   this.dispatcherGatewayFuture = new CompletableFuture<>();
+   this.confirmLeaderSessionFuture = 
dispatcherGatewayFuture.thenApply(RestfulGateway::getAddress);
+   this.terminationFuture = new CompletableFuture<>();
+   this.shutDownFuture = new CompletableFuture<>();
+
+   this.state = State.CREATED;
+   this.dispatcherService = null;
+   }
+
+   @VisibleForTesting
+   State getState() {
+   synchronized (lock) {
+   return state;
+   }
+   }
+
+   @Override
+   public final void start() {
+   runIfStateIs(
+   State.CREATED,
+   this::startInternal);
+   }
+
+   private void startInternal() {
+   log.info("Start {}.", getClass().getSimpleName());
+   state = State.RUNNING;
+   onStart();
+   }
+
+   @Override
+   public final UUID getLeaderSessionId() {
+   return leaderSessionId;
+   }
+
+   @Override
+   public final CompletableFuture 
getDispatcherGateway() {
+   return dispatcherGatewayFuture;
+   }
+
+   @Override
+   public final CompletableFuture getConfirmLeaderSessionFuture() {
+   return confirmLeaderSessionFuture;
+   }
+
+   @Override
+   public CompletableFuture getShutDownFuture() {
+   return shutDownFuture;
+   }
+
+   protected final Optional getDispatcherService() {
+   return Optional.ofNullable(dispatcherService);
+   }
+
+   @Override
+   public final CompletableFuture closeAsync() {
+   runIfStateIsNot(
+   State.STOPPED,
+   this::closeInternal);
+
+   return terminationFuture;
+   }
+
+   private void 

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r334539341
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
 ##
 @@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultDispatcherRunner}.
+ */
+public class DefaultDispatcherRunnerTest extends TestLogger {
+
+   private TestingLeaderElectionService testingLeaderElectionService;
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+   private TestingDispatcherLeaderProcessFactory 
testingDispatcherLeaderProcessFactory;
+
+   @Before
+   public void setup() {
+   testingLeaderElectionService = new 
TestingLeaderElectionService();
+   testingFatalErrorHandler = new TestingFatalErrorHandler();
+   testingDispatcherLeaderProcessFactory = 
TestingDispatcherLeaderProcessFactory.defaultValue();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (testingLeaderElectionService != null) {
+   testingLeaderElectionService.stop();
+   testingLeaderElectionService = null;
+   }
+
+   if (testingFatalErrorHandler != null) {
+   testingFatalErrorHandler.rethrowError();
+   testingFatalErrorHandler = null;
+   }
+   }
+
+   @Test
+   public void closeAsync_doesNotCompleteUncompletedShutDownFuture() 
throws Exception {
+   final DefaultDispatcherRunner dispatcherRunner = 
createDispatcherRunner();
+
+   final CompletableFuture terminationFuture = 
dispatcherRunner.closeAsync();
+   terminationFuture.get();
+
+   final CompletableFuture shutDownFuture = 
dispatcherRunner.getShutDownFuture();
+   assertThat(shutDownFuture.isDone(), is(false));
+   }
+
+   @Test
+   public void 
getDispatcherGateway_beforeDispatcherLeaderProcessCompletes_returnsDispatcherGateway()
 throws Exception {
+   final UUID leaderSessionId = UUID.randomUUID();
+   final TestingDispatcherGateway expectedDispatcherGateway = 
createDispatcherGateway(leaderSessionId);
+   final TestingDispatcherLeaderProcess 
testingDispatcherLeaderProcess = 
TestingDispatcherLeaderProcess.newBuilder(leaderSessionId)
+   
.setDispatcherGatewayFuture(CompletableFuture.completedFuture(expectedDispatcherGateway))
+   .build();
+
+   testingDispatcherLeaderProcessFactory = 
TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess);
+   try (final DefaultDispatcherRunner dispatcherRunner = 
createDispatcherRunner()) {
+
+   final CompletableFuture 
dispatcherGatewayFuture = dispatcherRunner.getDispatcherGateway();
+
+   assertThat(dispatcherGatewayFuture.isDone(), is(false));
+
+   

[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r334849344
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
 ##
 @@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which 
is responsible for the
+ * leader election.
+ */
+public class DefaultDispatcherRunner implements DispatcherRunner, 
LeaderContender {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDispatcherRunner.class);
+
+   private final Object lock = new Object();
+
+   private final LeaderElectionService leaderElectionService;
+
+   private final FatalErrorHandler fatalErrorHandler;
+
+   private final DispatcherLeaderProcessFactory 
dispatcherLeaderProcessFactory;
+
+   private final CompletableFuture terminationFuture;
+
+   private final CompletableFuture shutDownFuture;
+
+   private boolean isRunning;
 
 Review comment:
   General consensus is that fields do not have the `is` prefix: 
https://stackoverflow.com/questions/5322648/for-a-boolean-field-what-is-the-naming-convention-for-its-getter-setter


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r334994343
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcess.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.util.AutoCloseableAsync;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Leader process which encapsulates the lifecycle of the {@link Dispatcher} 
component.
+ */
+interface DispatcherLeaderProcess extends AutoCloseableAsync {
+
+   void start();
+
+   UUID getLeaderSessionId();
+
+   CompletableFuture getDispatcherGateway();
+
+   CompletableFuture getConfirmLeaderSessionFuture();
 
 Review comment:
   I think the name is not optimal. It's not clear that `String` is the leader 
address.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-15 Thread GitBox
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r335019283
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java
 ##
 @@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.runner;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+abstract class AbstractDispatcherLeaderProcess implements 
DispatcherLeaderProcess {
+
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   private final Object lock = new Object();
+
+   private final UUID leaderSessionId;
+
+   private final FatalErrorHandler fatalErrorHandler;
+
+   private final CompletableFuture 
dispatcherGatewayFuture;
+
+   private final CompletableFuture confirmLeaderSessionFuture;
+
+   private final CompletableFuture terminationFuture;
+
+   private final CompletableFuture shutDownFuture;
+
+   private State state;
+
+   @Nullable
+   private DispatcherService dispatcherService;
+
+   AbstractDispatcherLeaderProcess(UUID leaderSessionId, FatalErrorHandler 
fatalErrorHandler) {
+   this.leaderSessionId = leaderSessionId;
+   this.fatalErrorHandler = fatalErrorHandler;
+
+   this.dispatcherGatewayFuture = new CompletableFuture<>();
+   this.confirmLeaderSessionFuture = 
dispatcherGatewayFuture.thenApply(RestfulGateway::getAddress);
+   this.terminationFuture = new CompletableFuture<>();
+   this.shutDownFuture = new CompletableFuture<>();
+
+   this.state = State.CREATED;
+   this.dispatcherService = null;
 
 Review comment:
   nit: this assignment is not needed, default value is already null


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services