[FLINK-1554] [runtime] Allows the LocalFlinkMiniCluster to start multiple 
TaskManager in the same ActorSystem.

This closes #403.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/649f1583
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/649f1583
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/649f1583

Branch: refs/heads/master
Commit: 649f158391aebcefcaabcdaa02d5c8b95be45777
Parents: 447ce0a
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Feb 16 15:04:42 2015 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Feb 17 11:53:36 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  2 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  | 11 +++
 .../minicluster/LocalFlinkMiniCluster.scala     |  7 ++
 .../flink/runtime/taskmanager/TaskManager.scala | 31 ++++---
 .../runtime/testingUtils/TestingCluster.scala   |  3 +-
 flink-tests/pom.xml                             |  5 ++
 .../LocalFlinkMiniClusterITCase.java            | 88 ++++++++++++++++++++
 .../scala/org/apache/flink/yarn/YarnUtils.scala |  5 +-
 8 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b490b32..bfce7a2 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -664,7 +664,7 @@ object JobManager {
       if(executionMode.equals(LOCAL)){
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode 
execution")
 
-        TaskManager.startActorWithConfiguration("", configuration,
+        TaskManager.startActorWithConfiguration("", 
TaskManager.TASK_MANAGER_NAME, configuration,
           localAkkaCommunication = false, localTaskManagerCommunication = 
true)(jobManagerSystem)
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 6eea21c..3eb1d1e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
+import akka.pattern.Patterns.gracefulStop
 
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
@@ -131,6 +132,16 @@ abstract class FlinkMiniCluster(userConfiguration: 
Configuration,
   }
 
   def shutdown(): Unit = {
+    val futures = taskManagerActors map {
+        gracefulStop(_, timeout)
+    }
+
+    val future = gracefulStop(jobManagerActor, timeout)
+
+    implicit val executionContext = AkkaUtils.globalExecutionContext
+
+    Await.ready(Future.sequence(future +: futures), timeout)
+
     if(!singleActorSystem){
       taskManagerActorSystems foreach {
         _.shutdown()

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 8b16969..88006ac 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -90,7 +90,14 @@ class LocalFlinkMiniCluster(userConfiguration: 
Configuration, singleActorSystem:
 
     val localExecution = numTaskManagers == 1
 
+    val taskManagerName = if(singleActorSystem) {
+      TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
+    } else {
+      TaskManager.TASK_MANAGER_NAME
+    }
+
     TaskManager.startActorWithConfiguration(HOSTNAME,
+      taskManagerName,
       config,
       singleActorSystem,
       localExecution)(system)

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5a647d1..6d610a4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -711,18 +711,19 @@ object TaskManager {
       LOG.info("Security is enabled. Starting secure TaskManager.")
       SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
         override def run(): Unit = {
-          startActor(hostname, port, configuration)
+          startActor(hostname, port, configuration, 
TaskManager.TASK_MANAGER_NAME)
         }
       })
     } else {
-      startActor(hostname, port, configuration)
+      startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME)
     }
   }
 
-  def startActor(hostname: String, port: Int, configuration: Configuration) : 
Unit = {
+  def startActor(hostname: String, port: Int, configuration: Configuration,
+                 taskManagerName: String) : Unit = {
 
     val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, 
configuration,
-      localAkkaCommunication = false, localTaskManagerCommunication = false)
+      taskManagerName, localAkkaCommunication = false, 
localTaskManagerCommunication = false)
 
     taskManagerSystem.awaitTermination()
   }
@@ -780,6 +781,7 @@ object TaskManager {
   }
 
   def startActorSystemAndActor(hostname: String, port: Int, configuration: 
Configuration,
+                               taskManagerName: String,
                                localAkkaCommunication: Boolean,
                                localTaskManagerCommunication: Boolean): 
(ActorSystem, ActorRef) = {
     implicit val actorSystem = AkkaUtils.createActorSystem(configuration, 
Some((hostname, port)))
@@ -788,7 +790,7 @@ object TaskManager {
       parseConfiguration(hostname, configuration, localAkkaCommunication,
         localTaskManagerCommunication)
 
-    (actorSystem, startActor(connectionInfo, jobManagerURL, taskManagerConfig,
+    (actorSystem, startActor(taskManagerName, connectionInfo, jobManagerURL, 
taskManagerConfig,
       networkConfig))
   }
 
@@ -916,19 +918,23 @@ object TaskManager {
     (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)
   }
 
-  def startActor(connectionInfo: InstanceConnectionInfo, jobManagerURL: String,
+  def startActor(taskManagerName: String,
+                 connectionInfo: InstanceConnectionInfo,
+                 jobManagerURL: String,
                  taskManagerConfig: TaskManagerConfiguration,
                  networkConfig: NetworkEnvironmentConfiguration)
                 (implicit actorSystem: ActorSystem): ActorRef = {
-    startActor(Props(new TaskManager(connectionInfo, jobManagerURL, 
taskManagerConfig,
-      networkConfig)))
+    startActor(taskManagerName,
+      Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, 
networkConfig)))
   }
 
-  def startActor(props: Props)(implicit actorSystem: ActorSystem): ActorRef = {
-    actorSystem.actorOf(props, TASK_MANAGER_NAME)
+  def startActor(taskManagerName: String, props: Props)
+                (implicit actorSystem: ActorSystem): ActorRef = {
+    actorSystem.actorOf(props, taskManagerName)
   }
 
-  def startActorWithConfiguration(hostname: String, configuration: 
Configuration,
+  def startActorWithConfiguration(hostname: String, taskManagerName: String,
+                                  configuration: Configuration,
                                   localAkkaCommunication: Boolean,
                                   localTaskManagerCommunication: Boolean)
                                  (implicit system: ActorSystem) = {
@@ -936,7 +942,8 @@ object TaskManager {
       parseConfiguration(hostname, configuration, localAkkaCommunication,
         localTaskManagerCommunication)
 
-    startActor(connectionInfo, jobManagerURL, taskManagerConfig, 
networkConnectionConfiguration)
+    startActor(taskManagerName, connectionInfo, jobManagerURL, 
taskManagerConfig,
+      networkConnectionConfiguration)
   }
 
   def startProfiler(instancePath: String, reportInterval: Long)(implicit 
system: ActorSystem):

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index e2660d5..9d132b6 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -68,7 +68,8 @@ FlinkMiniCluster(userConfiguration, singleActorSystem) {
         localAkkaCommunication = singleActorSystem, 
localTaskManagerCommunication = true)
 
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, 
taskManagerConfig,
-      networkConnectionConfig) with TestingTaskManager), 
TaskManager.TASK_MANAGER_NAME + index)
+      networkConnectionConfig) with TestingTaskManager), 
TaskManager.TASK_MANAGER_NAME + "_" +
+      (index + 1))
   }
 
   def restartJobManager(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 985c675..33ca642 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -135,6 +135,11 @@ under the License.
                        <artifactId>scalatest_2.10</artifactId>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       <artifactId>akka-testkit_2.10</artifactId>
+               </dependency>
                
                <dependency>
                        <groupId>joda-time</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
new file mode 100644
index 0000000..7933932
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime.minicluster;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LocalFlinkMiniClusterITCase {
+
+       static ActorSystem system;
+
+       @BeforeClass
+       public static void setup() {
+               system = ActorSystem.create("Testkit", 
AkkaUtils.getDefaultAkkaConfig());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+               system = null;
+       }
+
+       @Test
+       public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
+               LocalFlinkMiniCluster miniCluster = null;
+
+               final int numTMs = 3;
+               final int numSlots = 14;
+
+               try{
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTMs);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+                       miniCluster = new LocalFlinkMiniCluster(config, true);
+
+                       final ActorRef jm = miniCluster.getJobManager();
+
+                       new JavaTestKit(system) {{
+                               new Within(TestingUtils.TESTING_DURATION()) {
+
+                                       @Override
+                                       protected void run() {
+                                               
jm.tell(JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+                                                               getRef());
+
+                                               
expectMsgEquals(TestingUtils.TESTING_DURATION(), numTMs);
+
+                                               
jm.tell(JobManagerMessages.getRequestTotalNumberOfSlots(), getRef());
+
+                                               
expectMsgEquals(TestingUtils.TESTING_DURATION(), numTMs*numSlots);
+                                       }
+                               };
+                       }};
+
+
+               } finally {
+                       if (miniCluster != null) {
+                               miniCluster.stop();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
index 775fcd0..66da8ec 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
@@ -38,7 +38,8 @@ object YarnUtils {
       TaskManager.parseConfiguration(hostname, config, localAkkaCommunication 
= false,
         localTaskManagerCommunication = false)
 
-    (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, 
jobManagerURL,
-      taskManagerConfig, networkConnectionConfiguration) with 
YarnTaskManager))(actorSystem))
+    (actorSystem, TaskManager.startActor(TaskManager.TASK_MANAGER_NAME,
+      Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
+        networkConnectionConfiguration) with YarnTaskManager))(actorSystem))
   }
 }

Reply via email to