Repository: flink
Updated Branches:
  refs/heads/master 743399a41 -> c1e326707


[FLINK-1557] Move JobManager web frontend server out of JobManager actor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1e32670
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1e32670
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1e32670

Branch: refs/heads/master
Commit: c1e326707a6c5b3cc550945f010eb4cabea24ad3
Parents: 743399a
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 16 16:34:13 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 17 09:33:57 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |  6 +-
 .../runtime/jobmanager/web/WebInfoServer.java   | 11 ++-
 .../flink/runtime/jobmanager/JobManager.scala   | 43 +++++++----
 .../runtime/jobmanager/WithWebServer.scala      | 40 ----------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  3 +-
 .../minicluster/LocalFlinkMiniCluster.scala     | 24 +++---
 .../runtime/testingUtils/TestingCluster.scala   |  2 +-
 .../apache/flink/yarn/ApplicationMaster.scala   | 78 ++++++++++++--------
 .../scala/org/apache/flink/yarn/Messages.scala  |  7 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  | 10 +--
 10 files changed, 112 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index a1515d0..62598a4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Lightweight configuration object which can store key/value pairs.
  */
+@SuppressWarnings("EqualsBetweenInconvertibleTypes")
 public class Configuration implements IOReadableWritable, 
java.io.Serializable, Cloneable {
 
        private static final long serialVersionUID = 1L;
@@ -584,6 +585,7 @@ public class Configuration implements IOReadableWritable, 
java.io.Serializable,
                return hash;
        }
 
+       @SuppressWarnings("EqualsBetweenInconvertibleTypes")
        @Override
        public boolean equals(Object obj) {
                if (this == obj) {
@@ -596,11 +598,11 @@ public class Configuration implements IOReadableWritable, 
java.io.Serializable,
                                Object thisVal = e.getValue();
                                Object otherVal = otherConf.get(e.getKey());
                                
-                               if (thisVal.getClass() != byte[].class) {
+                               if (!thisVal.getClass().equals(byte[].class)) {
                                        if (!thisVal.equals(otherVal)) {
                                                return false;
                                        }
-                               } else if (otherVal.getClass() == byte[].class) 
{
+                               } else if 
(otherVal.getClass().equals(byte[].class)) {
                                        if (!Arrays.equals((byte[]) thisVal, 
(byte[]) otherVal)) {
                                                return false;
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 1166d70..02714b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.URL;
 
 import akka.actor.ActorRef;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,13 +78,15 @@ public class WebInfoServer {
         * @throws IOException
         *         Thrown, if the server setup failed for an I/O related reason.
         */
-       public WebInfoServer(Configuration config, ActorRef jobmanager,
-                                               ActorRef archive, 
FiniteDuration timeout) throws IOException {
-               
-               // if no explicit configuration is given, use the global 
configuration
+       public WebInfoServer(Configuration config, ActorRef jobmanager, 
ActorRef archive) throws IOException {
                if (config == null) {
                        throw new IllegalArgumentException("No Configuration 
has been passed to the web server");
                }
+               if (jobmanager == null || archive == null) {
+                       throw new NullPointerException();
+               }
+
+               final FiniteDuration timeout = AkkaUtils.getTimeout(config);
                
                this.port = 
config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
                                
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d522d2d..4fe0ea6 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,7 +25,8 @@ import akka.actor.Status.Failure
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, 
Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
-import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, 
ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, 
ExecutionGraph}
+import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.Acknowledge
@@ -42,7 +43,7 @@ import 
org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, 
StackTrace, NextInputSplit, Heartbeat}
+import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, 
NextInputSplit, Heartbeat}
 import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.apache.flink.util.InstantiationUtil
 
@@ -656,8 +657,10 @@ object JobManager {
     try {
       LOG.debug("Starting JobManager actor")
 
-      startActor(configuration, jobManagerSystem, true)
+      // bring up the job manager actor
+      val (jobManager, archiver) = startJobManagerActors(configuration, 
jobManagerSystem)
 
+      // bring up a local task manager, if needed
       if(executionMode.equals(LOCAL)){
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode 
execution")
 
@@ -665,10 +668,14 @@ object JobManager {
           localAkkaCommunication = false, localTaskManagerCommunication = 
true)(jobManagerSystem)
       }
 
-      jobManagerSystem.awaitTermination()
+      // start the job manager web frontend
+      LOG.info("Starting JobManger web frontend")
+      val webServer = new WebInfoServer(configuration, jobManager, archiver)
+      webServer.start()
     }
     catch {
       case t: Throwable => {
+        LOG.error("Error while starting up JobManager", t)
         try {
           jobManagerSystem.shutdown()
         } catch {
@@ -677,6 +684,9 @@ object JobManager {
         throw t
       }
     }
+
+    // block until everything is shut down
+    jobManagerSystem.awaitTermination()
   }
 
   /**
@@ -814,9 +824,16 @@ object JobManager {
       profilerProps, executionRetries, delayBetweenRetries, timeout, 
archiveCount)
   }
 
-  def startActor(configuration: Configuration,
-                 actorSystem: ActorSystem,
-                 withWebServer: Boolean): ActorRef = {
+  /**
+   * Starts the JobManager and job archiver based on the given configuration, 
in the
+   * given actor system.
+   *
+   * @param configuration
+   * @param actorSystem
+   * @return A tuple of references (JobManager Ref, Archiver Ref)
+   */
+  def startJobManagerActors(configuration: Configuration,
+                            actorSystem: ActorSystem): (ActorRef, ActorRef) = {
 
     val (instanceManager, scheduler, libraryCacheManager, archiveProps, 
accumulatorManager,
       profilerProps, executionRetries, delayBetweenRetries,
@@ -827,17 +844,13 @@ object JobManager {
 
     val archiver: ActorRef = actorSystem.actorOf(archiveProps, 
JobManager.ARCHIVE_NAME)
 
-    val jobManagerProps = if (withWebServer) {
-      Props(new JobManager(configuration, instanceManager, scheduler,
-        libraryCacheManager, archiver, accumulatorManager, profiler, 
executionRetries,
-        delayBetweenRetries, timeout) with WithWebServer)
-    } else {
-      Props(classOf[JobManager], configuration, instanceManager, scheduler,
+    val jobManagerProps = Props(classOf[JobManager], configuration, 
instanceManager, scheduler,
         libraryCacheManager, archiver, accumulatorManager, profiler, 
executionRetries,
         delayBetweenRetries, timeout)
-    }
 
-    startActor(jobManagerProps, actorSystem)
+    val jobManager = startActor(jobManagerProps, actorSystem)
+
+    (jobManager, archiver)
   }
 
   def startActor(props: Props, actorSystem: ActorSystem): ActorRef = {

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
deleted file mode 100644
index bc83b9f..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import akka.actor.Actor
-import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-
-/**
- * Mixin for the [[JobManager]] which starts a [[WebInfoServer]] for the 
JobManager.
- */
-trait WithWebServer extends Actor {
-  that: JobManager =>
-
-  val webServer = new WebInfoServer(configuration, self, archive, timeout)
-  webServer.start()
-
-  abstract override def postStop(): Unit = {
-    log.info("Stopping webserver.")
-    webServer.stop()
-    log.info("Stopped webserver.")
-
-    super.postStop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 8f79003..6eea21c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -75,7 +75,8 @@ abstract class FlinkMiniCluster(userConfiguration: 
Configuration,
 
   def generateConfiguration(userConfiguration: Configuration): Configuration
 
-  def startJobManager(implicit system: ActorSystem): ActorRef
+  def startJobManager(system: ActorSystem): ActorRef
+
   def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef
 
   def getJobManagerAkkaConfig: Config = {

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index c24d96a..8b16969 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -64,33 +64,31 @@ class LocalFlinkMiniCluster(userConfiguration: 
Configuration, singleActorSystem:
     config
   }
 
-  override def startJobManager(implicit system: ActorSystem):
-  ActorRef = {
+  override def startJobManager(system: ActorSystem): ActorRef = {
     val config = configuration.clone()
-    JobManager.startActor(config, system, false)
+    val (jobManager, _) = JobManager.startJobManagerActors(config, system)
+    jobManager
   }
 
   override def startTaskManager(index: Int)(implicit system: ActorSystem): 
ActorRef = {
     val config = configuration.clone()
 
-    val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 
ConfigConstants
-      .DEFAULT_TASK_MANAGER_IPC_PORT)
-    val dataPort = 
config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
-      .DEFAULT_TASK_MANAGER_DATA_PORT)
+    val rpcPort = config.getInteger(
+      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+
+    val dataPort = config.getInteger(
+      ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
 
     if(rpcPort > 0){
       config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + 
index)
     }
-
     if(dataPort > 0){
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + 
index)
     }
 
-    val localExecution = if(numTaskManagers == 1){
-      true
-    } else {
-      false
-    }
+    val localExecution = numTaskManagers == 1
 
     TaskManager.startActorWithConfiguration(HOSTNAME,
       config,

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 6ae943d..e2660d5 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -46,7 +46,7 @@ FlinkMiniCluster(userConfiguration, singleActorSystem) {
     cfg
   }
 
-  override def startJobManager(implicit actorSystem: ActorSystem): ActorRef = {
+  override def startJobManager(actorSystem: ActorSystem): ActorRef = {
 
     val (instanceManager, scheduler, libraryCacheManager, _, 
accumulatorManager, _ ,
         executionRetries, delayBetweenRetries,

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 62b7caf..8ba4408 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -23,9 +23,10 @@ import java.security.PrivilegedAction
 
 import akka.actor._
 import org.apache.flink.client.CliFrontend
-import org.apache.flink.configuration.ConfigConstants
+import org.apache.flink.configuration.{Configuration, ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager}
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.yarn.Messages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -56,15 +57,16 @@ object ApplicationMaster {
 
     ugi.doAs(new PrivilegedAction[Object] {
       override def run(): Object = {
+
         var actorSystem: ActorSystem = null
-        var jobManager: ActorRef = ActorRef.noSender
+        var webserver: WebInfoServer = null
 
         try {
           val conf = new YarnConfiguration()
 
           val env = System.getenv()
 
-          if(LOG.isDebugEnabled) {
+          if (LOG.isDebugEnabled) {
             LOG.debug("All environment variables: " + env.toString)
           }
 
@@ -74,48 +76,55 @@ object ApplicationMaster {
           val logDirs = env.get(Environment.LOG_DIRS.key())
 
           // Note that we use the "ownHostname" given by YARN here, to make 
sure
-          // we use the hostnames given by YARN consitently throuout akka.
+          // we use the hostnames given by YARN consistently throughout akka.
           // for akka "localhost" and "localhost.localdomain" are different 
actors.
           val ownHostname = env.get(Environment.NM_HOST.key())
-          require(ownHostname != null, s"Own hostname not set.")
+          require(ownHostname != null, "Own hostname in YARN not set.")
 
           val taskManagerCount = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
           val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
           val dynamicPropertiesEncodedString = 
env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
 
-          val jobManagerWebPort = 0 // automatic assignment.
-
-          val (system, actor) = startJobManager(currDir, 
ownHostname,dynamicPropertiesEncodedString,
-            jobManagerWebPort, logDirs)
+          val (config, system, jobManager, archiver) = 
startJobManager(currDir, ownHostname,
+                                                      
dynamicPropertiesEncodedString, logDirs)
 
           actorSystem = system
-          jobManager = actor
           val extActor = system.asInstanceOf[ExtendedActorSystem]
           val jobManagerPort = extActor.provider.getDefaultAddress.port.get
 
+          // start the web info server
+          LOG.info("Starting Job Manger web frontend.")
+          webserver = new WebInfoServer(config, jobManager, archiver)
+
+          val jobManagerWebPort = 
webserver.getServer.getConnectors()(0).getLocalPort
+
           // generate configuration file for TaskManagers
           generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, 
ownHostname,
             jobManagerPort, jobManagerWebPort, logDirs, slots, 
taskManagerCount,
             dynamicPropertiesEncodedString)
 
-
           // send "start yarn session" message to YarnJobManager.
-          LOG.info("Start yarn session on job manager.")
-          jobManager ! StartYarnSession(conf, jobManagerPort)
+          LOG.info("Starting YARN session on Job Manager.")
+          jobManager ! StartYarnSession(conf, jobManagerPort, 
jobManagerWebPort)
 
-          LOG.info("Application Master properly initiated. Await termination 
of actor system.")
+          LOG.info("Application Master properly initiated. Awaiting 
termination of actor system.")
           actorSystem.awaitTermination()
-        }catch{
+        }
+        catch {
           case t: Throwable =>
             LOG.error("Error while running the application master.", t)
 
-            if(actorSystem != null){
+            if (actorSystem != null) {
               actorSystem.shutdown()
               actorSystem.awaitTermination()
-
-              actorSystem = null
             }
         }
+        finally {
+          if (webserver != null) {
+            LOG.debug("Stopping Job Manager web frontend.")
+            webserver.stop()
+          }
+        }
 
         null
       }
@@ -166,10 +175,22 @@ object ApplicationMaster {
     output.close()
   }
 
-  def startJobManager(currDir: String, hostname: String, 
dynamicPropertiesEncodedString: String,
-                       jobManagerWebPort: Int, logDirs: String): (ActorSystem, 
ActorRef) = {
-
-    LOG.info("Start job manager for yarn")
+  /**
+   * Starts the JobManager and all its components.
+   *
+   * @param currDir
+   * @param hostname
+   * @param dynamicPropertiesEncodedString
+   * @param logDirs
+   *
+   * @return (Configuration, JobManager ActorSystem, JobManager ActorRef, 
Archiver ActorRef)
+   */
+  def startJobManager(currDir: String,
+                      hostname: String,
+                      dynamicPropertiesEncodedString: String,
+                      logDirs: String): (Configuration, ActorSystem, ActorRef, 
ActorRef) = {
+
+    LOG.info("Starting JobManager for YARN")
     val args = Array[String]("--configDir", currDir)
 
     LOG.info(s"Config path: $currDir.")
@@ -181,15 +202,13 @@ object ApplicationMaster {
     for(property <- dynamicProperties.asScala){
       configuration.setString(property.f0, property.f1)
     }
-    configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
jobManagerWebPort)
-    configuration.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logDirs)
 
     // set port to 0 to let Akka automatically determine the port.
+    LOG.debug("Starting JobManager actor system")
     val jobManagerSystem = AkkaUtils.createActorSystem(configuration, 
Some((hostname, 0)))
 
-    LOG.info("Start job manager actor");
-
     // start all the components inside the job manager
+    LOG.debug("Starting JobManager components")
     val (instanceManager, scheduler, libraryCacheManager, archiveProps, 
accumulatorManager,
                    profilerProps, executionRetries, delayBetweenRetries,
                    timeout, _) = 
JobManager.createJobManagerComponents(configuration)
@@ -203,10 +222,11 @@ object ApplicationMaster {
 
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, 
scheduler,
       libraryCacheManager, archiver, accumulatorManager, profiler, 
executionRetries,
-      delayBetweenRetries, timeout) with WithWebServer with YarnJobManager)
+      delayBetweenRetries, timeout) with YarnJobManager)
 
+    LOG.debug("Starting JobManager actor")
     val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)
 
-    (jobManagerSystem, jobManager)
+    (configuration, jobManagerSystem, jobManager, archiver)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
index 0ac135d..e880fdf 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
@@ -26,13 +26,18 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 
 object Messages {
+
   case class YarnMessage(message: String, date: Date = new Date())
   case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int)
   case object RegisterClient
 
   case class StopYarnSession(status: FinalApplicationStatus)
+
   case object JobManagerStopped
-  case class StartYarnSession(configuration: Configuration, actorSystemPort: 
Int)
+
+  case class StartYarnSession(configuration: Configuration,
+                              actorSystemPort: Int,
+                              webServerport: Int)
 
   case class JobManagerActorRef(jobManager: ActorRef)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index c8dbbf1..a37c8b4 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -25,7 +25,7 @@ import java.util.Collections
 import akka.actor.ActorRef
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager}
+import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
 import org.apache.flink.yarn.Messages._
 import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
@@ -45,7 +45,7 @@ import scala.language.postfixOps
 
 
 trait YarnJobManager extends ActorLogMessages {
-  that: JobManager with WithWebServer =>
+  that: JobManager =>
 
   import context._
   import scala.collection.JavaConverters._
@@ -101,7 +101,7 @@ trait YarnJobManager extends ActorLogMessages {
       sender() ! new 
FlinkYarnClusterStatus(instanceManager.getNumberOfRegisteredTaskManagers,
         instanceManager.getTotalNumberOfSlots)
 
-    case StartYarnSession(conf, actorSystemPort: Int) =>
+    case StartYarnSession(conf, actorSystemPort, webServerport) =>
       log.info("Start yarn session.")
       val memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt
       val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager)
@@ -120,8 +120,6 @@ trait YarnJobManager extends ActorLogMessages {
       val shipListString = env.get(FlinkYarnClient.ENV_CLIENT_SHIP_FILES)
       val yarnClientUsername = env.get(FlinkYarnClient.ENV_CLIENT_USERNAME)
 
-      val jobManagerWebPort = 
that.webServer.getServer.getConnectors()(0).getLocalPort
-
       val rm = AMRMClient.createAMRMClient[ContainerRequest]()
       rm.init(conf)
       rm.start()
@@ -136,7 +134,7 @@ trait YarnJobManager extends ActorLogMessages {
       nmClientOption = Some(nm)
 
       // Register with ResourceManager
-      val url = s"http://$applicationMasterHost:$jobManagerWebPort";
+      val url = s"http://$applicationMasterHost:$webServerport";
       log.info(s"Registering ApplicationMaster with tracking url $url.")
       rm.registerApplicationMaster(applicationMasterHost, actorSystemPort, url)
 

Reply via email to