[FLINK-1554] [runtime] Allows the LocalFlinkMiniCluster to start multiple TaskManager in the same ActorSystem.
This closes #403. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/649f1583 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/649f1583 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/649f1583 Branch: refs/heads/master Commit: 649f158391aebcefcaabcdaa02d5c8b95be45777 Parents: 447ce0a Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Feb 16 15:04:42 2015 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Feb 17 11:53:36 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../runtime/minicluster/FlinkMiniCluster.scala | 11 +++ .../minicluster/LocalFlinkMiniCluster.scala | 7 ++ .../flink/runtime/taskmanager/TaskManager.scala | 31 ++++--- .../runtime/testingUtils/TestingCluster.scala | 3 +- flink-tests/pom.xml | 5 ++ .../LocalFlinkMiniClusterITCase.java | 88 ++++++++++++++++++++ .../scala/org/apache/flink/yarn/YarnUtils.scala | 5 +- 8 files changed, 136 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index b490b32..bfce7a2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -664,7 +664,7 @@ object JobManager { if(executionMode.equals(LOCAL)){ LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution") - TaskManager.startActorWithConfiguration("", configuration, + TaskManager.startActorWithConfiguration("", TaskManager.TASK_MANAGER_NAME, configuration, localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) } http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 6eea21c..3eb1d1e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -19,6 +19,7 @@ package org.apache.flink.runtime.minicluster import java.net.InetAddress +import akka.pattern.Patterns.gracefulStop import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} @@ -131,6 +132,16 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration, } def shutdown(): Unit = { + val futures = taskManagerActors map { + gracefulStop(_, timeout) + } + + val future = gracefulStop(jobManagerActor, timeout) + + implicit val executionContext = AkkaUtils.globalExecutionContext + + Await.ready(Future.sequence(future +: futures), timeout) + if(!singleActorSystem){ taskManagerActorSystems foreach { _.shutdown() http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 8b16969..88006ac 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -90,7 +90,14 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: val localExecution = numTaskManagers == 1 + val taskManagerName = if(singleActorSystem) { + TaskManager.TASK_MANAGER_NAME + "_" + (index + 1) + } else { + TaskManager.TASK_MANAGER_NAME + } + TaskManager.startActorWithConfiguration(HOSTNAME, + taskManagerName, config, singleActorSystem, localExecution)(system) http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 5a647d1..6d610a4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -711,18 +711,19 @@ object TaskManager { LOG.info("Security is enabled. Starting secure TaskManager.") SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] { override def run(): Unit = { - startActor(hostname, port, configuration) + startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME) } }) } else { - startActor(hostname, port, configuration) + startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME) } } - def startActor(hostname: String, port: Int, configuration: Configuration) : Unit = { + def startActor(hostname: String, port: Int, configuration: Configuration, + taskManagerName: String) : Unit = { val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration, - localAkkaCommunication = false, localTaskManagerCommunication = false) + taskManagerName, localAkkaCommunication = false, localTaskManagerCommunication = false) taskManagerSystem.awaitTermination() } @@ -780,6 +781,7 @@ object TaskManager { } def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration, + taskManagerName: String, localAkkaCommunication: Boolean, localTaskManagerCommunication: Boolean): (ActorSystem, ActorRef) = { implicit val actorSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, port))) @@ -788,7 +790,7 @@ object TaskManager { parseConfiguration(hostname, configuration, localAkkaCommunication, localTaskManagerCommunication) - (actorSystem, startActor(connectionInfo, jobManagerURL, taskManagerConfig, + (actorSystem, startActor(taskManagerName, connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)) } @@ -916,19 +918,23 @@ object TaskManager { (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig) } - def startActor(connectionInfo: InstanceConnectionInfo, jobManagerURL: String, + def startActor(taskManagerName: String, + connectionInfo: InstanceConnectionInfo, + jobManagerURL: String, taskManagerConfig: TaskManagerConfiguration, networkConfig: NetworkEnvironmentConfiguration) (implicit actorSystem: ActorSystem): ActorRef = { - startActor(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, - networkConfig))) + startActor(taskManagerName, + Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, networkConfig))) } - def startActor(props: Props)(implicit actorSystem: ActorSystem): ActorRef = { - actorSystem.actorOf(props, TASK_MANAGER_NAME) + def startActor(taskManagerName: String, props: Props) + (implicit actorSystem: ActorSystem): ActorRef = { + actorSystem.actorOf(props, taskManagerName) } - def startActorWithConfiguration(hostname: String, configuration: Configuration, + def startActorWithConfiguration(hostname: String, taskManagerName: String, + configuration: Configuration, localAkkaCommunication: Boolean, localTaskManagerCommunication: Boolean) (implicit system: ActorSystem) = { @@ -936,7 +942,8 @@ object TaskManager { parseConfiguration(hostname, configuration, localAkkaCommunication, localTaskManagerCommunication) - startActor(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) + startActor(taskManagerName, connectionInfo, jobManagerURL, taskManagerConfig, + networkConnectionConfiguration) } def startProfiler(instancePath: String, reportInterval: Long)(implicit system: ActorSystem): http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index e2660d5..9d132b6 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -68,7 +68,8 @@ FlinkMiniCluster(userConfiguration, singleActorSystem) { localAkkaCommunication = singleActorSystem, localTaskManagerCommunication = true) system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, - networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index) + networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + "_" + + (index + 1)) } def restartJobManager(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 985c675..33ca642 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -135,6 +135,11 @@ under the License. <artifactId>scalatest_2.10</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-testkit_2.10</artifactId> + </dependency> <dependency> <groupId>joda-time</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java new file mode 100644 index 0000000..7933932 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime.minicluster; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class LocalFlinkMiniClusterITCase { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testLocalFlinkMiniClusterWithMultipleTaskManagers() { + LocalFlinkMiniCluster miniCluster = null; + + final int numTMs = 3; + final int numSlots = 14; + + try{ + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots); + miniCluster = new LocalFlinkMiniCluster(config, true); + + final ActorRef jm = miniCluster.getJobManager(); + + new JavaTestKit(system) {{ + new Within(TestingUtils.TESTING_DURATION()) { + + @Override + protected void run() { + jm.tell(JobManagerMessages.getRequestNumberRegisteredTaskManager(), + getRef()); + + expectMsgEquals(TestingUtils.TESTING_DURATION(), numTMs); + + jm.tell(JobManagerMessages.getRequestTotalNumberOfSlots(), getRef()); + + expectMsgEquals(TestingUtils.TESTING_DURATION(), numTMs*numSlots); + } + }; + }}; + + + } finally { + if (miniCluster != null) { + miniCluster.stop(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala index 775fcd0..66da8ec 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala @@ -38,7 +38,8 @@ object YarnUtils { TaskManager.parseConfiguration(hostname, config, localAkkaCommunication = false, localTaskManagerCommunication = false) - (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, jobManagerURL, - taskManagerConfig, networkConnectionConfiguration) with YarnTaskManager))(actorSystem)) + (actorSystem, TaskManager.startActor(TaskManager.TASK_MANAGER_NAME, + Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, + networkConnectionConfiguration) with YarnTaskManager))(actorSystem)) } }