[FLINK-4836] [cluster management] Start ResourceManager in MiniCluster

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dc228fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dc228fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dc228fc

Branch: refs/heads/flip-6
Commit: 6dc228fcf5d33536176beb15338982c7bb5a2f56
Parents: 6c04166
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 17:02:33 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 17:36:56 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  | 126 +++++++++++++++----
 .../minicluster/MiniClusterJobDispatcher.java   |   4 +-
 2 files changed, 102 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6dc228fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ffcd12..d63f9a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,11 +32,18 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.concurrent.GuardedBy;
 
 import java.util.UUID;
@@ -48,13 +55,15 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 
 public class MiniCluster {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(MiniCluster.class);
+
        /** The lock to guard startup / shutdown / manipulation methods */
        private final Object lock = new Object();
 
        /** The configuration for this mini cluster */
        private final MiniClusterConfiguration config;
 
-       @GuardedBy("lock")
+       @GuardedBy("lock") 
        private MetricRegistry metricRegistry;
 
        @GuardedBy("lock")
@@ -73,6 +82,9 @@ public class MiniCluster {
        private HighAvailabilityServices haServices;
 
        @GuardedBy("lock")
+       private ResourceManager<?>[] resourceManagers;
+
+       @GuardedBy("lock")
        private TaskManagerRunner[] taskManagerRunners;
 
        @GuardedBy("lock")
@@ -98,6 +110,7 @@ public class MiniCluster {
        }
 
        /**
+        * Creates a new Flink mini cluster based on the given configuration.
         * 
         * @param config The configuration for the mini cluster
         */
@@ -149,6 +162,9 @@ public class MiniCluster {
                synchronized (lock) {
                        checkState(!running, "FlinkMiniCluster is already 
running");
 
+                       LOG.info("Starting Flink Mini Cluster");
+                       LOG.debug("Using configuration {}", config);
+
                        final Configuration configuration = new 
UnmodifiableConfiguration(config.getConfiguration());
                        final Time rpcTimeout = config.getRpcTimeout();
                        final int numJobManagers = config.getNumJobManagers();
@@ -210,13 +226,21 @@ public class MiniCluster {
                                }
 
                                // create the high-availability services
+                               LOG.info("Starting high-availability services");
                                haServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
 
-                               // bring up the task managers for the mini 
cluster
+                               // bring up the ResourceManager(s)
+                               LOG.info("Starting {} ResourceManger(s)", 
numResourceManagers);
+                               resourceManagers = startResourceManagers(
+                                               configuration, haServices, 
metricRegistry, numResourceManagers, resourceManagerRpcServices);
+
+                               // bring up the TaskManager(s) for the mini 
cluster
+                               LOG.info("Starting {} TaskManger(s)", 
numTaskManagers);
                                taskManagerRunners = startTaskManagers(
                                                configuration, haServices, 
metricRegistry, numTaskManagers, taskManagerRpcServices);
 
                                // bring up the dispatcher that launches 
JobManagers when jobs submitted
+                               LOG.info("Starting job dispatcher for {} 
JobManger(s)", numJobManagers);
                                jobDispatcher = new MiniClusterJobDispatcher(
                                                configuration, haServices, 
metricRegistry, numJobManagers, jobManagerRpcServices);
                        }
@@ -232,6 +256,8 @@ public class MiniCluster {
 
                        // now officially mark this as running
                        running = true;
+
+                       LOG.info("Flink Mini Cluster started successfully");
                }
        }
 
@@ -247,11 +273,13 @@ public class MiniCluster {
        public void shutdown() throws Exception {
                synchronized (lock) {
                        if (running) {
+                               LOG.info("Shutting down Flink Mini Cluster");
                                try {
                                        shutdownInternally();
                                } finally {
                                        running = false;
                                }
+                               LOG.info("Flink Mini Cluster is shut down");
                        }
                }
        }
@@ -270,11 +298,34 @@ public class MiniCluster {
                        try {
                                jobDispatcher.shutdown();
                        } catch (Exception e) {
-                               exception = firstOrSuppressed(e, exception);
+                               exception = e;
                        }
                        jobDispatcher = null;
                }
 
+               if (resourceManagers != null) {
+                       for (ResourceManager<?> rm : resourceManagers) {
+                               if (rm != null) {
+                                       try {
+                                               rm.shutDown();
+                                       } catch (Throwable t) {
+                                               exception = 
firstOrSuppressed(t, exception);
+                                       }
+                               }
+                       }
+                       resourceManagers = null;
+               }
+
+               // shut down the RpcServices
+               exception = shutDownRpc(commonRpcService, exception);
+               exception = shutDownRpcs(jobManagerRpcServices, exception);
+               exception = shutDownRpcs(taskManagerRpcServices, exception);
+               exception = shutDownRpcs(resourceManagerRpcServices, exception);
+               commonRpcService = null;
+               jobManagerRpcServices = null;
+               taskManagerRpcServices = null;
+               resourceManagerRpcServices = null;
+
                // shut down high-availability services
                if (haServices != null) {
                        try {
@@ -285,24 +336,6 @@ public class MiniCluster {
                        haServices = null;
                }
 
-               // shut down the RpcServices
-               if (commonRpcService != null) {
-                       exception = shutDownRpc(commonRpcService, exception);
-                       commonRpcService = null;
-               }
-               if (jobManagerRpcServices != null) {
-                       for (RpcService service : jobManagerRpcServices) {
-                               exception = shutDownRpc(service, exception);
-                       }
-                       jobManagerRpcServices = null;
-               }
-               if (taskManagerRpcServices != null) {
-                       for (RpcService service : taskManagerRpcServices) {
-                               exception = shutDownRpc(service, exception);
-                       }
-                       taskManagerRpcServices = null;
-               }
-
                // metrics shutdown
                if (metricRegistry != null) {
                        metricRegistry.shutdown();
@@ -402,6 +435,28 @@ public class MiniCluster {
                return new AkkaRpcService(actorSystem, askTimeout);
        }
 
+       protected ResourceManager<?>[] startResourceManagers(
+                       Configuration configuration,
+                       HighAvailabilityServices haServices,
+                       MetricRegistry metricRegistry,
+                       int numResourceManagers,
+                       RpcService[] resourceManagerRpcServices) throws 
Exception {
+
+               final StandaloneResourceManager[] resourceManagers = new 
StandaloneResourceManager[numResourceManagers];
+               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory(); 
+
+               for (int i = 0; i < numResourceManagers; i++) {
+                       resourceManagers[i] = new StandaloneResourceManager(
+                                       resourceManagerRpcServices[i],
+                                       haServices,
+                                       slotManagerFactory);
+
+                       resourceManagers[i].start();
+               }
+
+               return resourceManagers;
+       }
+
        protected TaskManagerRunner[] startTaskManagers(
                        Configuration configuration,
                        HighAvailabilityServices haServices,
@@ -429,15 +484,34 @@ public class MiniCluster {
        // 
------------------------------------------------------------------------
 
        private static Throwable shutDownRpc(RpcService rpcService, Throwable 
priorException) {
-               try {
-                       if (rpcService != null) {
+               if (rpcService != null) {
+                       try {
                                rpcService.stopService();
                        }
-                       return priorException;
+                       catch (Throwable t) {
+                               return firstOrSuppressed(t, priorException);
+                       }
                }
-               catch (Throwable t) {
-                       return firstOrSuppressed(t, priorException);
+
+               return priorException;
+       }
+
+       private static Throwable shutDownRpcs(RpcService[] rpcServices, 
Throwable priorException) {
+               if (rpcServices != null) {
+                       Throwable exception = priorException;
+
+                       for (RpcService service : rpcServices) {
+                               try {
+                                       if (service != null) {
+                                               service.stopService();
+                                       }
+                               }
+                               catch (Throwable t) {
+                                       exception = firstOrSuppressed(t, 
exception);
+                               }
+                       }
                }
+               return priorException;
        }
 
        private static MiniClusterConfiguration createConfig(Configuration cfg, 
boolean singleRpcService) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6dc228fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index d0df293..8ac8eba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -176,7 +176,7 @@ public class MiniClusterJobDispatcher {
        public void runDetached(JobGraph job) throws JobExecutionException {
                checkNotNull(job);
 
-               LOG.info("Received job for detached execution {} ({})", 
job.getName(), job.getJobID());
+               LOG.info("Received job for detached execution: {} ({})", 
job.getName(), job.getJobID());
 
                synchronized (lock) {
                        checkState(!shutdown, "mini cluster is shut down");
@@ -201,7 +201,7 @@ public class MiniClusterJobDispatcher {
        public JobExecutionResult runJobBlocking(JobGraph job) throws 
JobExecutionException, InterruptedException {
                checkNotNull(job);
                
-               LOG.info("Received job for blocking execution {} ({})", 
job.getName(), job.getJobID());
+               LOG.info("Received job for blocking execution: {} ({})", 
job.getName(), job.getJobID());
                final BlockingJobSync sync = new 
BlockingJobSync(job.getJobID(), numJobManagers);
 
                synchronized (lock) {

Reply via email to