Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5144#discussion_r28451221
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala 
---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.mesos
    +
    +import java.util.concurrent.CountDownLatch
    +
    +import org.apache.spark
    +import org.apache.spark.deploy.mesos.ui.MesosClusterUI
    +import org.apache.spark.deploy.rest.mesos.MesosRestServer
    +import org.apache.spark.scheduler.cluster.mesos._
    +import org.apache.spark.util.SignalLogger
    +import org.apache.spark.{Logging, SecurityManager, SparkConf}
    +
    +/*
    + * A dispatcher that is responsible for managing and launching drivers, 
and is intended to be
    + * used for Mesos cluster mode. The dispatcher is a long-running process 
started by the user in
    + * the cluster independently of Spark applications.
    + * It contains a [[MesosRestServer]] that listens for requests to submit 
drivers and a
    + * [[MesosClusterScheduler]] that processes these requests by negotiating 
with the Mesos master
    + * for resources.
    + *
    + * A typical new driver lifecycle is the following:
    + * - Driver submitted via spark-submit talking to the [[MesosRestServer]]
    + * - [[MesosRestServer]] queues the driver request to 
[[MesosClusterScheduler]]
    + * - [[MesosClusterScheduler]] gets resource offers and launches the 
drivers that are in queue
    + *
    + * This dispatcher supports both Mesos fine-grain or coarse-grain mode as 
the mode is configurable
    + * per driver launched.
    + * This class is needed since Mesos doesn't manage frameworks, so the 
dispatcher acts as
    + * a daemon to launch drivers as Mesos frameworks upon request. The 
dispatcher is also started and
    + * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher 
respectively.
    + */
    +private[mesos] class MesosClusterDispatcher(
    +    args: MesosClusterDispatcherArguments,
    +    conf: SparkConf)
    +  extends Logging {
    +
    +  private def publicAddress(conf: SparkConf, defaultAddress: String): 
String = {
    +    val envVar = conf.getenv("SPARK_PUBLIC_DNS")
    +    if (envVar != null) envVar else defaultAddress
    +  }
    +
    +  private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", 
"NONE").toUpperCase()
    +  logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
    +
    +  private val engineFactory = recoveryMode match {
    +    case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
    +    case "ZOOKEEPER" => new 
ZookeeperMesosClusterPersistenceEngineFactory(conf)
    +  }
    +
    +  private val scheduler = new MesosClusterScheduler(engineFactory, conf)
    +
    +  private val server = new MesosRestServer(args.host, args.port, conf, 
scheduler)
    +  private val webUi = new MesosClusterUI(
    +    new SecurityManager(conf),
    +    args.webUiPort,
    +    conf,
    +    publicAddress(conf, args.host),
    +    scheduler)
    +
    +  private val shutdownLatch = new CountDownLatch(1)
    +
    +  def start(): Unit = {
    +    webUi.bind()
    +    scheduler.frameworkUrl = webUi.activeWebUiUrl
    +    scheduler.start()
    +    server.start()
    +  }
    +
    +  def awaitShutdown(): Unit = {
    +    shutdownLatch.await()
    +  }
    +
    +  def stop(): Unit = {
    +    webUi.stop()
    +    server.stop()
    +    scheduler.stop()
    +    shutdownLatch.countDown()
    +  }
    +}
    +
    +private[mesos] object MesosClusterDispatcher extends spark.Logging {
    +  def main(args: Array[String]) {
    +    SignalLogger.register(log)
    +    val conf = new SparkConf
    +    val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
    +    conf.setMaster(dispatcherArgs.masterUrl)
    +    conf.setAppName(dispatcherArgs.name)
    +    dispatcherArgs.zookeeperUrl.foreach { z =>
    +      conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
    +      conf.set("spark.mesos.deploy.zookeeper.url", z)
    +    }
    +    val dispatcher = new MesosClusterDispatcher(
    +      dispatcherArgs,
    +      conf)
    --- End diff --
    
    bump up to L108


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to