[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r338950999 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which is responsible for the + * leader election. + */ +public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderContender { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunner.class); + + private final Object lock = new Object(); + + private final LeaderElectionService leaderElectionService; + + private final FatalErrorHandler fatalErrorHandler; + + private final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory; + + private final CompletableFuture terminationFuture; + + private final CompletableFuture shutDownFuture; + + private boolean running; + + private DispatcherLeaderProcess dispatcherLeaderProcess; + + private CompletableFuture previousDispatcherLeaderProcessTerminationFuture; + + private DefaultDispatcherRunner( + LeaderElectionService leaderElectionService, + FatalErrorHandler fatalErrorHandler, + DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) { + this.leaderElectionService = leaderElectionService; + this.fatalErrorHandler = fatalErrorHandler; + this.dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactory; + this.terminationFuture = new CompletableFuture<>(); + this.shutDownFuture = new CompletableFuture<>(); + + this.running = true; + this.dispatcherLeaderProcess = StoppedDispatcherLeaderProcess.INSTANCE; + this.previousDispatcherLeaderProcessTerminationFuture = CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture getShutDownFuture() { + return shutDownFuture; + } + + @Override + public CompletableFuture closeAsync() { + synchronized (lock) { + if (!running) { + return terminationFuture; + } else { + running = false; + } + } + + stopDispatcherLeaderProcess(); + + FutureUtils.forward( + previousDispatcherLeaderProcessTerminationFuture, + terminationFuture); + + return terminationFuture; + } + + // --- + // Leader election + // --- + + @Override + public void grantLeadership(UUID leaderSessionID) { + runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID)); + } + + private void startNewDispatcherLeaderProcess(UUID leaderSessionID) { + stopDispatcherLeaderProcess(); + + dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); + + final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess; + FutureUtils.assertNoException( +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r338951963 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerLeaderElectionLifecycleManager.java ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; + +final class DispatcherRunnerLeaderElectionLifecycleManager implements DispatcherRunner { + private final T dispatcherRunner; + private final LeaderElectionService leaderElectionService; + + private DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception { + this.dispatcherRunner = dispatcherRunner; + this.leaderElectionService = leaderElectionService; + + leaderElectionService.start(dispatcherRunner); + } + + @Override + public CompletableFuture getShutDownFuture() { + return dispatcherRunner.getShutDownFuture(); + } + + @Override + public CompletableFuture closeAsync() { + final CompletableFuture servicesTerminationFuture = stopServices(); + final CompletableFuture dispatcherRunnerTerminationFuture = dispatcherRunner.closeAsync(); + + return FutureUtils.completeAll(Arrays.asList(servicesTerminationFuture, dispatcherRunnerTerminationFuture)); + } + + private CompletableFuture stopServices() { + Exception exception = null; + + try { + leaderElectionService.stop(); + } catch (Exception e) { + exception = e; Review comment: You could return from here directly. Is this for extensibility? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r338947618 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -243,18 +255,6 @@ private void stopDispatcherServices() throws Exception { exception = ExceptionUtils.firstOrSuppressed(e, exception); Review comment: nit: `firstOrSuppressed` isn't needed anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r338947618 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -243,18 +255,6 @@ private void stopDispatcherServices() throws Exception { exception = ExceptionUtils.firstOrSuppressed(e, exception); Review comment: nit: This line isn't needed anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r338587104 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.hamcrest.core.Is; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SessionDispatcherLeaderProcess}. + */ +public class SessionDispatcherLeaderProcessTest extends TestLogger { + + private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph"); + + private static ExecutorService ioExecutor; + + private final UUID leaderSessionId = UUID.randomUUID(); + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private TestingDispatcherServiceFactory dispatcherServiceFactory; + + @BeforeClass + public static void setupClass() { + ioExecutor = Executors.newSingleThreadExecutor(); + } + + @Before + public void setup() { + fatalErrorHandler = new TestingFatalErrorHandler(); + jobGraphStore = TestingJobGraphStore.newBuilder().build(); + dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); + } + + @After + public void teardown() throws Exception { + if (fatalErrorHandler != null) { + fatalErrorHandler.rethrowError(); + fatalErrorHandler = null; + } + } + + @AfterClass + public static void teardownClass() { + if (ioExecutor != null) { + ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, ioExecutor); + } + } + + @Test + public void start_afterClose_doesNotHaveAnEffect() throws Exception { + final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess(); + + dispatcherLeaderProcess.close(); + dispatcherLeaderProcess.start(); + + assertThat(dispatcherLeaderProcess.getState(), is(SessionDispatcherLeaderProcess.State.STOPPED)); + } + + @Test + public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r337075405 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which is responsible for the + * leader election. + */ +public class DefaultDispatcherRunner implements DispatcherRunner, LeaderContender { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunner.class); + + private final Object lock = new Object(); + + private final LeaderElectionService leaderElectionService; + + private final FatalErrorHandler fatalErrorHandler; + + private final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory; + + private final CompletableFuture terminationFuture; + + private final CompletableFuture shutDownFuture; + + private boolean isRunning; + + private DispatcherLeaderProcess dispatcherLeaderProcess; + + private CompletableFuture previousDispatcherLeaderProcessTerminationFuture; + + private CompletableFuture dispatcherGatewayFuture; + + DefaultDispatcherRunner( + LeaderElectionService leaderElectionService, + FatalErrorHandler fatalErrorHandler, + DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception { + this.leaderElectionService = leaderElectionService; + this.fatalErrorHandler = fatalErrorHandler; + this.dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactory; + this.terminationFuture = new CompletableFuture<>(); + this.shutDownFuture = new CompletableFuture<>(); + + this.isRunning = true; + this.dispatcherLeaderProcess = StoppedDispatcherLeaderProcess.INSTANCE; + this.previousDispatcherLeaderProcessTerminationFuture = CompletableFuture.completedFuture(null); + this.dispatcherGatewayFuture = new CompletableFuture<>(); + + startDispatcherRunner(leaderElectionService); Review comment: Is it not possible to introduce a start method on the DispatcherRunner? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r336020164 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.hamcrest.core.Is; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SessionDispatcherLeaderProcess}. + */ +public class SessionDispatcherLeaderProcessTest extends TestLogger { + + private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph"); + + private static ExecutorService ioExecutor; + + private final UUID leaderSessionId = UUID.randomUUID(); + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private TestingDispatcherServiceFactory dispatcherServiceFactory; + + @BeforeClass + public static void setupClass() { + ioExecutor = Executors.newSingleThreadExecutor(); + } + + @Before + public void setup() { + fatalErrorHandler = new TestingFatalErrorHandler(); + jobGraphStore = TestingJobGraphStore.newBuilder().build(); + dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); + } + + @After + public void teardown() throws Exception { + if (fatalErrorHandler != null) { + fatalErrorHandler.rethrowError(); + fatalErrorHandler = null; + } + } + + @AfterClass + public static void teardownClass() { + if (ioExecutor != null) { + ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, ioExecutor); + } + } + + @Test + public void start_afterClose_doesNotHaveAnEffect() throws Exception { + final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess(); + + dispatcherLeaderProcess.close(); + dispatcherLeaderProcess.start(); + + assertThat(dispatcherLeaderProcess.getState(), is(SessionDispatcherLeaderProcess.State.STOPPED)); + } + + @Test + public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r336014644 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link DefaultDispatcherRunner}. + */ +public class DefaultDispatcherRunnerTest extends TestLogger { + + private TestingLeaderElectionService testingLeaderElectionService; + private TestingFatalErrorHandler testingFatalErrorHandler; + private TestingDispatcherLeaderProcessFactory testingDispatcherLeaderProcessFactory; + + @Before + public void setup() { + testingLeaderElectionService = new TestingLeaderElectionService(); + testingFatalErrorHandler = new TestingFatalErrorHandler(); + testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.defaultValue(); + } + + @After + public void teardown() throws Exception { + if (testingLeaderElectionService != null) { + testingLeaderElectionService.stop(); + testingLeaderElectionService = null; + } + + if (testingFatalErrorHandler != null) { + testingFatalErrorHandler.rethrowError(); + testingFatalErrorHandler = null; + } + } + + @Test + public void closeAsync_doesNotCompleteUncompletedShutDownFuture() throws Exception { + final DefaultDispatcherRunner dispatcherRunner = createDispatcherRunner(); + + final CompletableFuture terminationFuture = dispatcherRunner.closeAsync(); + terminationFuture.get(); + + final CompletableFuture shutDownFuture = dispatcherRunner.getShutDownFuture(); + assertThat(shutDownFuture.isDone(), is(false)); + } + + @Test + public void getDispatcherGateway_beforeDispatcherLeaderProcessCompletes_returnsDispatcherGateway() throws Exception { + final UUID leaderSessionId = UUID.randomUUID(); + final TestingDispatcherGateway expectedDispatcherGateway = createDispatcherGateway(leaderSessionId); + final TestingDispatcherLeaderProcess testingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId) + .setDispatcherGatewayFuture(CompletableFuture.completedFuture(expectedDispatcherGateway)) + .build(); + + testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess); + try (final DefaultDispatcherRunner dispatcherRunner = createDispatcherRunner()) { + + final CompletableFuture dispatcherGatewayFuture = dispatcherRunner.getDispatcherGateway(); + + assertThat(dispatcherGatewayFuture.isDone(), is(false)); + +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335895994 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherFactory; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.dispatcher.DispatcherServices; +import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory; +import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; +import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore; +import org.apache.flink.runtime.dispatcher.SingleJobJobGraphStore; +import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; +import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory; +import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.minicluster.SessionDispatcherWithUUIDFactory; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcServiceResource; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.BlobServerResource; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Integration tests for the {@link DefaultDispatcherRunner}. + */ +public class DefaultDispatcherRunnerITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunnerITCase.class); + + private static final Time TIMEOUT = Time.seconds(10L); + + private static final JobID TEST_JOB_ID = new JobID(); + + @ClassRule + public static TestingRpcServiceResource rpcServiceResource = new TestingRpcServiceResource(); + + @ClassRule + public static BlobServerResource blobServerResource = new BlobServerResource(); + + private JobGraph jobGraph; + + private TestingLeaderElectionService dispatcherLeaderElectionService; + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private PartialDispatcherServices partialDispatcherServices; + + private DefaultDispatcherRunnerFactory dispatcherRunnerFactory; + + @Before + public void setup() { + dispatcherRunnerFactory =
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335507651 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.hamcrest.core.Is; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SessionDispatcherLeaderProcess}. + */ +public class SessionDispatcherLeaderProcessTest extends TestLogger { + + private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph"); + + private static ExecutorService ioExecutor; + + private final UUID leaderSessionId = UUID.randomUUID(); + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private TestingDispatcherServiceFactory dispatcherServiceFactory; + + @BeforeClass + public static void setupClass() { + ioExecutor = Executors.newSingleThreadExecutor(); + } + + @Before + public void setup() { + fatalErrorHandler = new TestingFatalErrorHandler(); + jobGraphStore = TestingJobGraphStore.newBuilder().build(); + dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); + } + + @After + public void teardown() throws Exception { + if (fatalErrorHandler != null) { + fatalErrorHandler.rethrowError(); + fatalErrorHandler = null; + } + } + + @AfterClass + public static void teardownClass() { + if (ioExecutor != null) { + ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, ioExecutor); + } + } + + @Test + public void start_afterClose_doesNotHaveAnEffect() throws Exception { + final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess(); + + dispatcherLeaderProcess.close(); + dispatcherLeaderProcess.start(); + + assertThat(dispatcherLeaderProcess.getState(), is(SessionDispatcherLeaderProcess.State.STOPPED)); + } + + @Test + public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335481458 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java ## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; +import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; +import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder; +import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperRunningJobsRegistry; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.rpc.TestingRpcServiceResource; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.framework.CuratorFramework; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.Matchers.emptyArray; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the interaction between the {@link DefaultDispatcherRunner} and ZooKeeper. + */ +public class ZooKeeperDefaultDispatcherRunnerTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDefaultDispatcherRunnerTest.class); + + private static final Time TESTING_TIMEOUT = Time.seconds(10L); + + private static final Duration VERIFICATION_TIMEOUT = Duration.ofSeconds(10L); + + @ClassRule + public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @ClassRule + public static TestingRpcServiceResource testingRpcServiceResource = new TestingRpcServiceResource(); + + private BlobServer blobServer; + + private TestingFatalErrorHandler
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335449903 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.FunctionUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * Process which encapsulates the job recovery logic and life cycle management of a + * {@link Dispatcher}. + */ +public class SessionDispatcherLeaderProcess extends AbstractDispatcherLeaderProcess implements JobGraphStore.JobGraphListener { Review comment: There is an extra space between `extends` and `AbstractDispatcherLeaderProcess`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335498065 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.hamcrest.core.Is; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SessionDispatcherLeaderProcess}. + */ +public class SessionDispatcherLeaderProcessTest extends TestLogger { + + private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph"); + + private static ExecutorService ioExecutor; + + private final UUID leaderSessionId = UUID.randomUUID(); + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private TestingDispatcherServiceFactory dispatcherServiceFactory; + + @BeforeClass + public static void setupClass() { + ioExecutor = Executors.newSingleThreadExecutor(); + } + + @Before + public void setup() { + fatalErrorHandler = new TestingFatalErrorHandler(); + jobGraphStore = TestingJobGraphStore.newBuilder().build(); + dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); + } + + @After + public void teardown() throws Exception { + if (fatalErrorHandler != null) { + fatalErrorHandler.rethrowError(); + fatalErrorHandler = null; + } + } + + @AfterClass + public static void teardownClass() { + if (ioExecutor != null) { + ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, ioExecutor); + } + } + + @Test + public void start_afterClose_doesNotHaveAnEffect() throws Exception { + final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess(); + + dispatcherLeaderProcess.close(); + dispatcherLeaderProcess.start(); + + assertThat(dispatcherLeaderProcess.getState(), is(SessionDispatcherLeaderProcess.State.STOPPED)); + } + + @Test + public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335498403 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.hamcrest.core.Is; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SessionDispatcherLeaderProcess}. + */ +public class SessionDispatcherLeaderProcessTest extends TestLogger { + + private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph"); + + private static ExecutorService ioExecutor; + + private final UUID leaderSessionId = UUID.randomUUID(); + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private TestingDispatcherServiceFactory dispatcherServiceFactory; + + @BeforeClass + public static void setupClass() { + ioExecutor = Executors.newSingleThreadExecutor(); + } + + @Before + public void setup() { + fatalErrorHandler = new TestingFatalErrorHandler(); + jobGraphStore = TestingJobGraphStore.newBuilder().build(); + dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); + } + + @After + public void teardown() throws Exception { + if (fatalErrorHandler != null) { + fatalErrorHandler.rethrowError(); + fatalErrorHandler = null; + } + } + + @AfterClass + public static void teardownClass() { + if (ioExecutor != null) { + ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, ioExecutor); + } + } + + @Test + public void start_afterClose_doesNotHaveAnEffect() throws Exception { + final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess(); + + dispatcherLeaderProcess.close(); + dispatcherLeaderProcess.start(); + + assertThat(dispatcherLeaderProcess.getState(), is(SessionDispatcherLeaderProcess.State.STOPPED)); + } + + @Test + public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335484914 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +/** + * {@link JobGraphWriter} implementation which does not allow to store + * {@link JobGraph}. + */ +public enum ThrowingJobGraphWriter implements JobGraphWriter { + INSTANCE; + + @Override + public void putJobGraph(JobGraph jobGraph) { + throw new UnsupportedOperationException( + String.format("Cannot store job graphs in the %s.", getClass().getSimpleName())); Review comment: nit: The class name will anyways appear in the stacktrace. Imo `"Storing job graphs is not supported"` is enough for an error message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335451362 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.FunctionUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * Process which encapsulates the job recovery logic and life cycle management of a + * {@link Dispatcher}. + */ +public class SessionDispatcherLeaderProcess extends AbstractDispatcherLeaderProcess implements JobGraphStore.JobGraphListener { + + private final DispatcherServiceFactory dispatcherFactory; + + private final JobGraphStore jobGraphStore; + + private final Executor ioExecutor; + + private CompletableFuture onGoingRecoveryOperation = FutureUtils.completedVoidFuture(); + + private SessionDispatcherLeaderProcess( + UUID leaderSessionId, + DispatcherServiceFactory dispatcherFactory, + JobGraphStore jobGraphStore, + Executor ioExecutor, + FatalErrorHandler fatalErrorHandler) { + super(leaderSessionId, fatalErrorHandler); + + this.dispatcherFactory = dispatcherFactory; + this.jobGraphStore = jobGraphStore; + this.ioExecutor = ioExecutor; + } + + @Override + protected void onStart() { + startServices(); + + onGoingRecoveryOperation = recoverJobsAsync() + .thenAccept(this::createDispatcherIfRunning) + .handle(this::onErrorIfRunning); + } + + private void startServices() { + try { + jobGraphStore.start(this); + } catch (Exception e) { + throw new FlinkRuntimeException( + String.format( + "Could not start %s when trying to start the %s.", + jobGraphStore.getClass().getSimpleName(), + getClass().getSimpleName()), + e); + } + } + + private void createDispatcherIfRunning(Collection jobGraphs) { + runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs)); + } + + private void createDispatcher(Collection jobGraphs) { + + final DispatcherService dispatcherService = dispatcherFactory.create( + DispatcherId.fromUuid(getLeaderSessionId()), + jobGraphs, + jobGraphStore); + + completeDispatcherSetup(dispatcherService); + } + + private CompletableFuture> recoverJobsAsync() { + return CompletableFuture.supplyAsync( + this::recoverJobsIfRunning, + ioExecutor); + } + + private Collection recoverJobsIfRunning() { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335495000 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link DefaultDispatcherRunner}. + */ +public class DefaultDispatcherRunnerTest extends TestLogger { + + private TestingLeaderElectionService testingLeaderElectionService; + private TestingFatalErrorHandler testingFatalErrorHandler; + private TestingDispatcherLeaderProcessFactory testingDispatcherLeaderProcessFactory; + + @Before + public void setup() { + testingLeaderElectionService = new TestingLeaderElectionService(); + testingFatalErrorHandler = new TestingFatalErrorHandler(); + testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.defaultValue(); + } + + @After + public void teardown() throws Exception { + if (testingLeaderElectionService != null) { + testingLeaderElectionService.stop(); + testingLeaderElectionService = null; + } + + if (testingFatalErrorHandler != null) { + testingFatalErrorHandler.rethrowError(); + testingFatalErrorHandler = null; + } + } + + @Test + public void closeAsync_doesNotCompleteUncompletedShutDownFuture() throws Exception { + final DefaultDispatcherRunner dispatcherRunner = createDispatcherRunner(); + + final CompletableFuture terminationFuture = dispatcherRunner.closeAsync(); + terminationFuture.get(); + + final CompletableFuture shutDownFuture = dispatcherRunner.getShutDownFuture(); + assertThat(shutDownFuture.isDone(), is(false)); + } + + @Test + public void getDispatcherGateway_beforeDispatcherLeaderProcessCompletes_returnsDispatcherGateway() throws Exception { + final UUID leaderSessionId = UUID.randomUUID(); + final TestingDispatcherGateway expectedDispatcherGateway = createDispatcherGateway(leaderSessionId); + final TestingDispatcherLeaderProcess testingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId) + .setDispatcherGatewayFuture(CompletableFuture.completedFuture(expectedDispatcherGateway)) + .build(); + + testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess); + try (final DefaultDispatcherRunner dispatcherRunner = createDispatcherRunner()) { + + final CompletableFuture dispatcherGatewayFuture = dispatcherRunner.getDispatcherGateway(); + + assertThat(dispatcherGatewayFuture.isDone(), is(false)); + +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335499341 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.hamcrest.core.Is; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SessionDispatcherLeaderProcess}. + */ +public class SessionDispatcherLeaderProcessTest extends TestLogger { + + private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph"); + + private static ExecutorService ioExecutor; + + private final UUID leaderSessionId = UUID.randomUUID(); + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private TestingDispatcherServiceFactory dispatcherServiceFactory; + + @BeforeClass + public static void setupClass() { + ioExecutor = Executors.newSingleThreadExecutor(); + } + + @Before + public void setup() { + fatalErrorHandler = new TestingFatalErrorHandler(); + jobGraphStore = TestingJobGraphStore.newBuilder().build(); + dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); + } + + @After + public void teardown() throws Exception { + if (fatalErrorHandler != null) { + fatalErrorHandler.rethrowError(); + fatalErrorHandler = null; + } + } + + @AfterClass + public static void teardownClass() { + if (ioExecutor != null) { + ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, ioExecutor); + } + } + + @Test + public void start_afterClose_doesNotHaveAnEffect() throws Exception { + final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess(); + + dispatcherLeaderProcess.close(); + dispatcherLeaderProcess.start(); + + assertThat(dispatcherLeaderProcess.getState(), is(SessionDispatcherLeaderProcess.State.STOPPED)); + } + + @Test + public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335493271 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java ## @@ -130,6 +135,8 @@ public DispatcherRunner getDispatcherRunner() { } private CompletableFuture closeAsyncInternal() { + LOG.info("Closing components of {}.", getClass().getSimpleName()); Review comment: I find it unnecessary to log the class name explicitly. There are a couple occurrences of this pattern in this PR. The logger name already is the class name. Moreover, the class name of the caller can be [configured as a pattern](http://logback.qos.ch/manual/layouts.html#class). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335509782 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.hamcrest.core.Is; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SessionDispatcherLeaderProcess}. + */ +public class SessionDispatcherLeaderProcessTest extends TestLogger { + + private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph"); + + private static ExecutorService ioExecutor; + + private final UUID leaderSessionId = UUID.randomUUID(); + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private TestingDispatcherServiceFactory dispatcherServiceFactory; + + @BeforeClass + public static void setupClass() { + ioExecutor = Executors.newSingleThreadExecutor(); + } + + @Before + public void setup() { + fatalErrorHandler = new TestingFatalErrorHandler(); + jobGraphStore = TestingJobGraphStore.newBuilder().build(); + dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); + } + + @After + public void teardown() throws Exception { + if (fatalErrorHandler != null) { + fatalErrorHandler.rethrowError(); + fatalErrorHandler = null; + } + } + + @AfterClass + public static void teardownClass() { + if (ioExecutor != null) { + ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, ioExecutor); + } + } + + @Test + public void start_afterClose_doesNotHaveAnEffect() throws Exception { + final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess(); + + dispatcherLeaderProcess.close(); + dispatcherLeaderProcess.start(); + + assertThat(dispatcherLeaderProcess.getState(), is(SessionDispatcherLeaderProcess.State.STOPPED)); + } + + @Test + public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335503334 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.hamcrest.core.Is; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SessionDispatcherLeaderProcess}. + */ +public class SessionDispatcherLeaderProcessTest extends TestLogger { + + private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph"); + + private static ExecutorService ioExecutor; + + private final UUID leaderSessionId = UUID.randomUUID(); + + private TestingFatalErrorHandler fatalErrorHandler; + + private JobGraphStore jobGraphStore; + + private TestingDispatcherServiceFactory dispatcherServiceFactory; + + @BeforeClass + public static void setupClass() { + ioExecutor = Executors.newSingleThreadExecutor(); + } + + @Before + public void setup() { + fatalErrorHandler = new TestingFatalErrorHandler(); + jobGraphStore = TestingJobGraphStore.newBuilder().build(); + dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); + } + + @After + public void teardown() throws Exception { + if (fatalErrorHandler != null) { + fatalErrorHandler.rethrowError(); + fatalErrorHandler = null; + } + } + + @AfterClass + public static void teardownClass() { + if (ioExecutor != null) { + ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, ioExecutor); + } + } + + @Test + public void start_afterClose_doesNotHaveAnEffect() throws Exception { + final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess(); + + dispatcherLeaderProcess.close(); + dispatcherLeaderProcess.start(); + + assertThat(dispatcherLeaderProcess.getState(), is(SessionDispatcherLeaderProcess.State.STOPPED)); + } + + @Test + public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception { +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335093669 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StoppedDispatcherLeaderProcess.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * {@link DispatcherLeaderProcess} implementation which is stopped. This class + * is useful as the initial state of the {@link DefaultDispatcherRunner}. + */ +public enum StoppedDispatcherLeaderProcess implements DispatcherLeaderProcess { + INSTANCE; + + private static final CompletableFuture TERMINATION_FUTURE = CompletableFuture.completedFuture(null); + private static final UUID LEADER_SESSION_ID = new UUID(0L, 0L); + private static final CompletableFuture NEVER_COMPLETED_LEADER_SESSION_FUTURE = new CompletableFuture<>(); + private static final CompletableFuture NEVER_COMPLETED_SHUTDOWN_FUTURE = new CompletableFuture<>(); + + @Override + public void start() { + + } + + @Override + public UUID getLeaderSessionId() { + return LEADER_SESSION_ID; + } + + @Override + public CompletableFuture getDispatcherGateway() { + return null; + } + + @Override + public CompletableFuture getConfirmLeaderSessionFuture() { + return NEVER_COMPLETED_LEADER_SESSION_FUTURE; Review comment: > Should this method be changed to throw `IllegalStateException`? Afaik `getConfirmLeaderSessionFuture()` shouldn't be called on `StoppedDispatcherLeaderProcess` Ok, I might be wrong about this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335093669 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StoppedDispatcherLeaderProcess.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * {@link DispatcherLeaderProcess} implementation which is stopped. This class + * is useful as the initial state of the {@link DefaultDispatcherRunner}. + */ +public enum StoppedDispatcherLeaderProcess implements DispatcherLeaderProcess { + INSTANCE; + + private static final CompletableFuture TERMINATION_FUTURE = CompletableFuture.completedFuture(null); + private static final UUID LEADER_SESSION_ID = new UUID(0L, 0L); + private static final CompletableFuture NEVER_COMPLETED_LEADER_SESSION_FUTURE = new CompletableFuture<>(); + private static final CompletableFuture NEVER_COMPLETED_SHUTDOWN_FUTURE = new CompletableFuture<>(); + + @Override + public void start() { + + } + + @Override + public UUID getLeaderSessionId() { + return LEADER_SESSION_ID; + } + + @Override + public CompletableFuture getDispatcherGateway() { + return null; + } + + @Override + public CompletableFuture getConfirmLeaderSessionFuture() { + return NEVER_COMPLETED_LEADER_SESSION_FUTURE; Review comment: > Should this method be changed to throw `IllegalStateException`? Afaik `getConfirmLeaderSessionFuture()` shouldn't be called on `StoppedDispatcherLeaderProcess` Ok, I might be wrong about this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r334991361 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunner.java ## @@ -30,11 +31,12 @@ public interface DispatcherRunner extends AutoCloseableAsync { /** -* Get the currently running {@link Dispatcher}. +* Return a future which is completed once the {@link Dispatcher} gains +* leadership. * -* @return the currently running dispatcher +* @return Future which is completed with the leader's gateway */ - Dispatcher getDispatcher(); + CompletableFuture getDispatcherGateway(); Review comment: This is only used in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r334513335 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StoppedDispatcherLeaderProcess.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * {@link DispatcherLeaderProcess} implementation which is stopped. This class + * is useful as the initial state of the {@link DefaultDispatcherRunner}. + */ +public enum StoppedDispatcherLeaderProcess implements DispatcherLeaderProcess { + INSTANCE; + + private static final CompletableFuture TERMINATION_FUTURE = CompletableFuture.completedFuture(null); + private static final UUID LEADER_SESSION_ID = new UUID(0L, 0L); + private static final CompletableFuture NEVER_COMPLETED_LEADER_SESSION_FUTURE = new CompletableFuture<>(); + private static final CompletableFuture NEVER_COMPLETED_SHUTDOWN_FUTURE = new CompletableFuture<>(); + + @Override + public void start() { + + } + + @Override + public UUID getLeaderSessionId() { + return LEADER_SESSION_ID; + } + + @Override + public CompletableFuture getDispatcherGateway() { + return null; + } + + @Override + public CompletableFuture getConfirmLeaderSessionFuture() { + return NEVER_COMPLETED_LEADER_SESSION_FUTURE; Review comment: Should this method be changed to throw `IllegalStateException`? Afaik `getConfirmLeaderSessionFuture()` shouldn't be called on `StoppedDispatcherLeaderProcess`. Same with `getShutDownFuture()` Also, strictly speaking the state of this class is mutable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r334866031 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which is responsible for the + * leader election. + */ +public class DefaultDispatcherRunner implements DispatcherRunner, LeaderContender { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunner.class); + + private final Object lock = new Object(); + + private final LeaderElectionService leaderElectionService; + + private final FatalErrorHandler fatalErrorHandler; + + private final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory; + + private final CompletableFuture terminationFuture; + + private final CompletableFuture shutDownFuture; + + private boolean isRunning; + + private DispatcherLeaderProcess dispatcherLeaderProcess; + + private CompletableFuture previousDispatcherLeaderProcessTerminationFuture; + + private CompletableFuture dispatcherGatewayFuture; + + DefaultDispatcherRunner( + LeaderElectionService leaderElectionService, + FatalErrorHandler fatalErrorHandler, + DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception { + this.leaderElectionService = leaderElectionService; + this.fatalErrorHandler = fatalErrorHandler; + this.dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactory; + this.terminationFuture = new CompletableFuture<>(); + this.shutDownFuture = new CompletableFuture<>(); + + this.isRunning = true; + this.dispatcherLeaderProcess = StoppedDispatcherLeaderProcess.INSTANCE; + this.previousDispatcherLeaderProcessTerminationFuture = CompletableFuture.completedFuture(null); + this.dispatcherGatewayFuture = new CompletableFuture<>(); + + startDispatcherRunner(leaderElectionService); Review comment: This is not safe with respect to memory visiblity: https://stackoverflow.com/questions/2513597/what-is-an-incompletely-constructed-object We are leaking an instance of `this` before the object is constructed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335019838 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import java.util.function.Supplier; + +abstract class AbstractDispatcherLeaderProcess implements DispatcherLeaderProcess { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private final Object lock = new Object(); + + private final UUID leaderSessionId; + + private final FatalErrorHandler fatalErrorHandler; + + private final CompletableFuture dispatcherGatewayFuture; + + private final CompletableFuture confirmLeaderSessionFuture; + + private final CompletableFuture terminationFuture; + + private final CompletableFuture shutDownFuture; + + private State state; + + @Nullable + private DispatcherService dispatcherService; + + AbstractDispatcherLeaderProcess(UUID leaderSessionId, FatalErrorHandler fatalErrorHandler) { + this.leaderSessionId = leaderSessionId; + this.fatalErrorHandler = fatalErrorHandler; + + this.dispatcherGatewayFuture = new CompletableFuture<>(); + this.confirmLeaderSessionFuture = dispatcherGatewayFuture.thenApply(RestfulGateway::getAddress); + this.terminationFuture = new CompletableFuture<>(); + this.shutDownFuture = new CompletableFuture<>(); + + this.state = State.CREATED; + this.dispatcherService = null; + } + + @VisibleForTesting + State getState() { + synchronized (lock) { + return state; + } + } + + @Override + public final void start() { + runIfStateIs( + State.CREATED, + this::startInternal); + } + + private void startInternal() { + log.info("Start {}.", getClass().getSimpleName()); + state = State.RUNNING; + onStart(); + } + + @Override + public final UUID getLeaderSessionId() { + return leaderSessionId; + } + + @Override + public final CompletableFuture getDispatcherGateway() { + return dispatcherGatewayFuture; + } + + @Override + public final CompletableFuture getConfirmLeaderSessionFuture() { + return confirmLeaderSessionFuture; + } + + @Override + public CompletableFuture getShutDownFuture() { + return shutDownFuture; + } + + protected final Optional getDispatcherService() { + return Optional.ofNullable(dispatcherService); + } + + @Override + public final CompletableFuture closeAsync() { + runIfStateIsNot( + State.STOPPED, + this::closeInternal); + + return terminationFuture; + } + + private void
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r334539341 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link DefaultDispatcherRunner}. + */ +public class DefaultDispatcherRunnerTest extends TestLogger { + + private TestingLeaderElectionService testingLeaderElectionService; + private TestingFatalErrorHandler testingFatalErrorHandler; + private TestingDispatcherLeaderProcessFactory testingDispatcherLeaderProcessFactory; + + @Before + public void setup() { + testingLeaderElectionService = new TestingLeaderElectionService(); + testingFatalErrorHandler = new TestingFatalErrorHandler(); + testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.defaultValue(); + } + + @After + public void teardown() throws Exception { + if (testingLeaderElectionService != null) { + testingLeaderElectionService.stop(); + testingLeaderElectionService = null; + } + + if (testingFatalErrorHandler != null) { + testingFatalErrorHandler.rethrowError(); + testingFatalErrorHandler = null; + } + } + + @Test + public void closeAsync_doesNotCompleteUncompletedShutDownFuture() throws Exception { + final DefaultDispatcherRunner dispatcherRunner = createDispatcherRunner(); + + final CompletableFuture terminationFuture = dispatcherRunner.closeAsync(); + terminationFuture.get(); + + final CompletableFuture shutDownFuture = dispatcherRunner.getShutDownFuture(); + assertThat(shutDownFuture.isDone(), is(false)); + } + + @Test + public void getDispatcherGateway_beforeDispatcherLeaderProcessCompletes_returnsDispatcherGateway() throws Exception { + final UUID leaderSessionId = UUID.randomUUID(); + final TestingDispatcherGateway expectedDispatcherGateway = createDispatcherGateway(leaderSessionId); + final TestingDispatcherLeaderProcess testingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId) + .setDispatcherGatewayFuture(CompletableFuture.completedFuture(expectedDispatcherGateway)) + .build(); + + testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess); + try (final DefaultDispatcherRunner dispatcherRunner = createDispatcherRunner()) { + + final CompletableFuture dispatcherGatewayFuture = dispatcherRunner.getDispatcherGateway(); + + assertThat(dispatcherGatewayFuture.isDone(), is(false)); + +
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r334849344 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which is responsible for the + * leader election. + */ +public class DefaultDispatcherRunner implements DispatcherRunner, LeaderContender { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunner.class); + + private final Object lock = new Object(); + + private final LeaderElectionService leaderElectionService; + + private final FatalErrorHandler fatalErrorHandler; + + private final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory; + + private final CompletableFuture terminationFuture; + + private final CompletableFuture shutDownFuture; + + private boolean isRunning; Review comment: General consensus is that fields do not have the `is` prefix: https://stackoverflow.com/questions/5322648/for-a-boolean-field-what-is-the-naming-convention-for-its-getter-setter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r334994343 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcess.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.util.AutoCloseableAsync; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * Leader process which encapsulates the lifecycle of the {@link Dispatcher} component. + */ +interface DispatcherLeaderProcess extends AutoCloseableAsync { + + void start(); + + UUID getLeaderSessionId(); + + CompletableFuture getDispatcherGateway(); + + CompletableFuture getConfirmLeaderSessionFuture(); Review comment: I think the name is not optimal. It's not clear that `String` is the leader address. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
GJL commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r335019283 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.runner; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import java.util.function.Supplier; + +abstract class AbstractDispatcherLeaderProcess implements DispatcherLeaderProcess { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private final Object lock = new Object(); + + private final UUID leaderSessionId; + + private final FatalErrorHandler fatalErrorHandler; + + private final CompletableFuture dispatcherGatewayFuture; + + private final CompletableFuture confirmLeaderSessionFuture; + + private final CompletableFuture terminationFuture; + + private final CompletableFuture shutDownFuture; + + private State state; + + @Nullable + private DispatcherService dispatcherService; + + AbstractDispatcherLeaderProcess(UUID leaderSessionId, FatalErrorHandler fatalErrorHandler) { + this.leaderSessionId = leaderSessionId; + this.fatalErrorHandler = fatalErrorHandler; + + this.dispatcherGatewayFuture = new CompletableFuture<>(); + this.confirmLeaderSessionFuture = dispatcherGatewayFuture.thenApply(RestfulGateway::getAddress); + this.terminationFuture = new CompletableFuture<>(); + this.shutDownFuture = new CompletableFuture<>(); + + this.state = State.CREATED; + this.dispatcherService = null; Review comment: nit: this assignment is not needed, default value is already null This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services