[FLINK-4836] [cluster management] Start ResourceManager in MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dc228fc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dc228fc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dc228fc Branch: refs/heads/flip-6 Commit: 6dc228fcf5d33536176beb15338982c7bb5a2f56 Parents: 6c04166 Author: Stephan Ewen <se...@apache.org> Authored: Mon Oct 17 17:02:33 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Oct 17 17:36:56 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/minicluster/MiniCluster.java | 126 +++++++++++++++---- .../minicluster/MiniClusterJobDispatcher.java | 4 +- 2 files changed, 102 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6dc228fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 1ffcd12..d63f9a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -32,11 +32,18 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.concurrent.GuardedBy; import java.util.UUID; @@ -48,13 +55,15 @@ import static org.apache.flink.util.Preconditions.checkState; public class MiniCluster { + private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class); + /** The lock to guard startup / shutdown / manipulation methods */ private final Object lock = new Object(); /** The configuration for this mini cluster */ private final MiniClusterConfiguration config; - @GuardedBy("lock") + @GuardedBy("lock") private MetricRegistry metricRegistry; @GuardedBy("lock") @@ -73,6 +82,9 @@ public class MiniCluster { private HighAvailabilityServices haServices; @GuardedBy("lock") + private ResourceManager<?>[] resourceManagers; + + @GuardedBy("lock") private TaskManagerRunner[] taskManagerRunners; @GuardedBy("lock") @@ -98,6 +110,7 @@ public class MiniCluster { } /** + * Creates a new Flink mini cluster based on the given configuration. * * @param config The configuration for the mini cluster */ @@ -149,6 +162,9 @@ public class MiniCluster { synchronized (lock) { checkState(!running, "FlinkMiniCluster is already running"); + LOG.info("Starting Flink Mini Cluster"); + LOG.debug("Using configuration {}", config); + final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration()); final Time rpcTimeout = config.getRpcTimeout(); final int numJobManagers = config.getNumJobManagers(); @@ -210,13 +226,21 @@ public class MiniCluster { } // create the high-availability services + LOG.info("Starting high-availability services"); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration); - // bring up the task managers for the mini cluster + // bring up the ResourceManager(s) + LOG.info("Starting {} ResourceManger(s)", numResourceManagers); + resourceManagers = startResourceManagers( + configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices); + + // bring up the TaskManager(s) for the mini cluster + LOG.info("Starting {} TaskManger(s)", numTaskManagers); taskManagerRunners = startTaskManagers( configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices); // bring up the dispatcher that launches JobManagers when jobs submitted + LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers); jobDispatcher = new MiniClusterJobDispatcher( configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices); } @@ -232,6 +256,8 @@ public class MiniCluster { // now officially mark this as running running = true; + + LOG.info("Flink Mini Cluster started successfully"); } } @@ -247,11 +273,13 @@ public class MiniCluster { public void shutdown() throws Exception { synchronized (lock) { if (running) { + LOG.info("Shutting down Flink Mini Cluster"); try { shutdownInternally(); } finally { running = false; } + LOG.info("Flink Mini Cluster is shut down"); } } } @@ -270,11 +298,34 @@ public class MiniCluster { try { jobDispatcher.shutdown(); } catch (Exception e) { - exception = firstOrSuppressed(e, exception); + exception = e; } jobDispatcher = null; } + if (resourceManagers != null) { + for (ResourceManager<?> rm : resourceManagers) { + if (rm != null) { + try { + rm.shutDown(); + } catch (Throwable t) { + exception = firstOrSuppressed(t, exception); + } + } + } + resourceManagers = null; + } + + // shut down the RpcServices + exception = shutDownRpc(commonRpcService, exception); + exception = shutDownRpcs(jobManagerRpcServices, exception); + exception = shutDownRpcs(taskManagerRpcServices, exception); + exception = shutDownRpcs(resourceManagerRpcServices, exception); + commonRpcService = null; + jobManagerRpcServices = null; + taskManagerRpcServices = null; + resourceManagerRpcServices = null; + // shut down high-availability services if (haServices != null) { try { @@ -285,24 +336,6 @@ public class MiniCluster { haServices = null; } - // shut down the RpcServices - if (commonRpcService != null) { - exception = shutDownRpc(commonRpcService, exception); - commonRpcService = null; - } - if (jobManagerRpcServices != null) { - for (RpcService service : jobManagerRpcServices) { - exception = shutDownRpc(service, exception); - } - jobManagerRpcServices = null; - } - if (taskManagerRpcServices != null) { - for (RpcService service : taskManagerRpcServices) { - exception = shutDownRpc(service, exception); - } - taskManagerRpcServices = null; - } - // metrics shutdown if (metricRegistry != null) { metricRegistry.shutdown(); @@ -402,6 +435,28 @@ public class MiniCluster { return new AkkaRpcService(actorSystem, askTimeout); } + protected ResourceManager<?>[] startResourceManagers( + Configuration configuration, + HighAvailabilityServices haServices, + MetricRegistry metricRegistry, + int numResourceManagers, + RpcService[] resourceManagerRpcServices) throws Exception { + + final StandaloneResourceManager[] resourceManagers = new StandaloneResourceManager[numResourceManagers]; + final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); + + for (int i = 0; i < numResourceManagers; i++) { + resourceManagers[i] = new StandaloneResourceManager( + resourceManagerRpcServices[i], + haServices, + slotManagerFactory); + + resourceManagers[i].start(); + } + + return resourceManagers; + } + protected TaskManagerRunner[] startTaskManagers( Configuration configuration, HighAvailabilityServices haServices, @@ -429,15 +484,34 @@ public class MiniCluster { // ------------------------------------------------------------------------ private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) { - try { - if (rpcService != null) { + if (rpcService != null) { + try { rpcService.stopService(); } - return priorException; + catch (Throwable t) { + return firstOrSuppressed(t, priorException); + } } - catch (Throwable t) { - return firstOrSuppressed(t, priorException); + + return priorException; + } + + private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorException) { + if (rpcServices != null) { + Throwable exception = priorException; + + for (RpcService service : rpcServices) { + try { + if (service != null) { + service.stopService(); + } + } + catch (Throwable t) { + exception = firstOrSuppressed(t, exception); + } + } } + return priorException; } private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) { http://git-wip-us.apache.org/repos/asf/flink/blob/6dc228fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index d0df293..8ac8eba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -176,7 +176,7 @@ public class MiniClusterJobDispatcher { public void runDetached(JobGraph job) throws JobExecutionException { checkNotNull(job); - LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID()); + LOG.info("Received job for detached execution: {} ({})", job.getName(), job.getJobID()); synchronized (lock) { checkState(!shutdown, "mini cluster is shut down"); @@ -201,7 +201,7 @@ public class MiniClusterJobDispatcher { public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { checkNotNull(job); - LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID()); + LOG.info("Received job for blocking execution: {} ({})", job.getName(), job.getJobID()); final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers); synchronized (lock) {