Repository: spark Updated Branches: refs/heads/master 87250580f -> e99d01709
[SPARK-13220][CORE] deprecate yarn-client and yarn-cluster mode Author: jerryshao <ss...@hortonworks.com> Closes #11229 from jerryshao/SPARK-13220. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e99d0170 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e99d0170 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e99d0170 Branch: refs/heads/master Commit: e99d0170982b06676110906db4de6196586829f6 Parents: 8725058 Author: jerryshao <ss...@hortonworks.com> Authored: Tue Feb 23 12:30:57 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Feb 23 12:30:57 2016 +0000 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkConf.scala | 25 ++++++++++++++++ .../scala/org/apache/spark/SparkContext.scala | 30 ++++++++------------ .../org/apache/spark/deploy/SparkSubmit.scala | 21 ++++++++------ .../SparkContextSchedulerCreationSuite.scala | 28 +++++++++--------- .../apache/spark/deploy/SparkSubmitSuite.scala | 7 +++-- docs/submitting-applications.md | 6 ---- project/MimaExcludes.scala | 8 ++++-- .../spark/deploy/yarn/ApplicationMaster.scala | 5 ++-- .../deploy/yarn/BaseYarnClusterSuite.scala | 5 ++-- .../spark/deploy/yarn/YarnClusterSuite.scala | 3 +- 10 files changed, 82 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 36e240e..b81bfb3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -503,6 +503,31 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set("spark.executor.instances", value) } } + + if (contains("spark.master") && get("spark.master").startsWith("yarn-")) { + val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " + + "instead use \"yarn\" with specified deploy mode." + + get("spark.master") match { + case "yarn-cluster" => + logWarning(warning) + set("spark.master", "yarn") + set("spark.submit.deployMode", "cluster") + case "yarn-client" => + logWarning(warning) + set("spark.master", "yarn") + set("spark.submit.deployMode", "client") + case _ => // Any other unexpected master will be checked when creating scheduler backend. + } + } + + if (contains("spark.submit.deployMode")) { + get("spark.submit.deployMode") match { + case "cluster" | "client" => + case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " + + "\"client\".") + } + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cd7eed3..a1fa266 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -237,6 +237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def jars: Seq[String] = _jars def files: Seq[String] = _files def master: String = _conf.get("spark.master") + def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client") def appName: String = _conf.get("spark.app.name") private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false) @@ -375,10 +376,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster - // yarn-standalone is deprecated, but still supported - if ((master == "yarn-cluster" || master == "yarn-standalone") && - !_conf.contains("spark.yarn.app.id")) { - throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " + + if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { + throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") } @@ -414,7 +413,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") + if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true") // "_jobProgressListener" should be set up before creating SparkEnv because when creating // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. @@ -491,7 +490,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) // Create and start the scheduler - val (sched, ts) = SparkContext.createTaskScheduler(this, master) + val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) @@ -1590,10 +1589,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => - // yarn-standalone is deprecated, but still supported - if (SparkHadoopUtil.get.isYarnMode() && - (master == "yarn-standalone" || master == "yarn-cluster")) { - // In order for this to work in yarn-cluster mode the user must specify the + if (master == "yarn" && deployMode == "cluster") { + // In order for this to work in yarn cluster mode the user must specify the // --addJars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() @@ -2319,7 +2316,8 @@ object SparkContext extends Logging { */ private def createTaskScheduler( sc: SparkContext, - master: String): (SchedulerBackend, TaskScheduler) = { + master: String, + deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. @@ -2381,11 +2379,7 @@ object SparkContext extends Logging { } (backend, scheduler) - case "yarn-standalone" | "yarn-cluster" => - if (master == "yarn-standalone") { - logWarning( - "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.") - } + case "yarn" if deployMode == "cluster" => val scheduler = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) @@ -2410,7 +2404,7 @@ object SparkContext extends Logging { scheduler.initialize(backend) (backend, scheduler) - case "yarn-client" => + case "yarn" if deployMode == "client" => val scheduler = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) @@ -2451,7 +2445,7 @@ object SparkContext extends Logging { case zkUrl if zkUrl.startsWith("zk://") => logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " + "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.") - createTaskScheduler(sc, "mesos://" + zkUrl) + createTaskScheduler(sc, "mesos://" + zkUrl, deployMode) case _ => throw new SparkException("Could not parse Master URL: '" + master + "'") http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index a6749f7..d5a3383 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -226,11 +226,17 @@ object SparkSubmit { // Set the cluster manager val clusterManager: Int = args.master match { - case m if m.startsWith("yarn") => YARN + case "yarn" => YARN + case "yarn-client" | "yarn-cluster" => + printWarning(s"Master ${args.master} is deprecated since 2.0." + + " Please use master \"yarn\" with specified deploy mode instead.") + YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("local") => LOCAL - case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1 + case _ => + printErrorAndExit("Master must either be yarn or start with spark, mesos, local") + -1 } // Set the deploy mode; default is client mode @@ -240,23 +246,20 @@ object SparkSubmit { case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1 } - // Because "yarn-cluster" and "yarn-client" encapsulate both the master - // and deploy mode, we have some logic to infer the master and deploy mode + // Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both + // the master and deploy mode, we have some logic to infer the master and deploy mode // from each other if only one is specified, or exit early if they are at odds. if (clusterManager == YARN) { - if (args.master == "yarn-standalone") { - printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.") - args.master = "yarn-cluster" - } (args.master, args.deployMode) match { case ("yarn-cluster", null) => deployMode = CLUSTER + args.master = "yarn" case ("yarn-cluster", "client") => printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"") case ("yarn-client", "cluster") => printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"") case (_, mode) => - args.master = "yarn-" + Option(mode).getOrElse("client") + args.master = "yarn" } // Make sure YARN is included in our build if we're trying to use it http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index b96c937..9b6ab7b 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -29,15 +29,21 @@ class SparkContextSchedulerCreationSuite extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with Logging { def createTaskScheduler(master: String): TaskSchedulerImpl = - createTaskScheduler(master, new SparkConf()) + createTaskScheduler(master, "client") - def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = { + def createTaskScheduler(master: String, deployMode: String): TaskSchedulerImpl = + createTaskScheduler(master, deployMode, new SparkConf()) + + def createTaskScheduler( + master: String, + deployMode: String, + conf: SparkConf): TaskSchedulerImpl = { // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. sc = new SparkContext("local", "test", conf) val createTaskSchedulerMethod = PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler) - val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master) + val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, deployMode) sched.asInstanceOf[TaskSchedulerImpl] } @@ -107,7 +113,7 @@ class SparkContextSchedulerCreationSuite test("local-default-parallelism") { val conf = new SparkConf().set("spark.default.parallelism", "16") - val sched = createTaskScheduler("local", conf) + val sched = createTaskScheduler("local", "client", conf) sched.backend match { case s: LocalBackend => assert(s.defaultParallelism() === 16) @@ -122,9 +128,9 @@ class SparkContextSchedulerCreationSuite } } - def testYarn(master: String, expectedClassName: String) { + def testYarn(master: String, deployMode: String, expectedClassName: String) { try { - val sched = createTaskScheduler(master) + val sched = createTaskScheduler(master, deployMode) assert(sched.getClass === Utils.classForName(expectedClassName)) } catch { case e: SparkException => @@ -135,21 +141,17 @@ class SparkContextSchedulerCreationSuite } test("yarn-cluster") { - testYarn("yarn-cluster", "org.apache.spark.scheduler.cluster.YarnClusterScheduler") - } - - test("yarn-standalone") { - testYarn("yarn-standalone", "org.apache.spark.scheduler.cluster.YarnClusterScheduler") + testYarn("yarn", "cluster", "org.apache.spark.scheduler.cluster.YarnClusterScheduler") } test("yarn-client") { - testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnScheduler") + testYarn("yarn", "client", "org.apache.spark.scheduler.cluster.YarnScheduler") } def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) { val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString) try { - val sched = createTaskScheduler(master, conf) + val sched = createTaskScheduler(master, "client", conf) assert(sched.backend.getClass === expectedClass) } catch { case e: UnsatisfiedLinkError => http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index fe2c829..41ac60e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -358,7 +358,8 @@ class SparkSubmitSuite val appArgs = new SparkSubmitArguments(clArgs) val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) sysProps("spark.executor.memory") should be ("5g") - sysProps("spark.master") should be ("yarn-cluster") + sysProps("spark.master") should be ("yarn") + sysProps("spark.submit.deployMode") should be ("cluster") mainClass should be ("org.apache.spark.deploy.yarn.Client") } @@ -454,7 +455,7 @@ class SparkSubmitSuite // Test files and archives (Yarn) val clArgs2 = Seq( - "--master", "yarn-client", + "--master", "yarn", "--class", "org.SomeClass", "--files", files, "--archives", archives, @@ -512,7 +513,7 @@ class SparkSubmitSuite writer2.println("spark.yarn.dist.archives " + archives) writer2.close() val clArgs2 = Seq( - "--master", "yarn-client", + "--master", "yarn", "--class", "org.SomeClass", "--properties-file", f2.getPath, "thejar.jar" http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/docs/submitting-applications.md ---------------------------------------------------------------------- diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 413532f..cebdb6d 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -150,12 +150,6 @@ The master URL passed to Spark can be in one of the following formats: <code>client</code> or <code>cluster</code> mode depending on the value of <code>--deploy-mode</code>. The cluster location will be found based on the <code>HADOOP_CONF_DIR</code> or <code>YARN_CONF_DIR</code> variable. </td></tr> -<tr><td> <code>yarn-client</code> </td><td> Equivalent to <code>yarn</code> with <code>--deploy-mode client</code>, - which is preferred to `yarn-client` -</td></tr> -<tr><td> <code>yarn-cluster</code> </td><td> Equivalent to <code>yarn</code> with <code>--deploy-mode cluster</code>, - which is preferred to `yarn-cluster` -</td></tr> </table> http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 746223f..47693aa 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -262,12 +262,16 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets$default$3"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.impl.GraphImpl.mapReduceTriplets") ) ++ Seq( - // SPARK-13426 Remove the support of SIMR - ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkMasterRegex.SIMR_REGEX") + // SPARK-13426 Remove the support of SIMR + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkMasterRegex.SIMR_REGEX") ) ++ Seq( // SPARK-13413 Remove SparkContext.metricsSystem/schedulerBackend_ setter ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metricsSystem"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.schedulerBackend_=") + ) ++ Seq( + // SPARK-13220 Deprecate yarn-client and yarn-cluster mode + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler") ) case v if v.startsWith("1.6") => Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index cccc061..9f586bf 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -129,8 +129,9 @@ private[spark] class ApplicationMaster( // other spark processes running on the same box System.setProperty("spark.ui.port", "0") - // Set the master property to match the requested mode. - System.setProperty("spark.master", "yarn-cluster") + // Set the master and deploy mode property to match the requested mode. + System.setProperty("spark.master", "yarn") + System.setProperty("spark.submit.deployMode", "cluster") // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index cd24c70..272e245 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -136,7 +136,7 @@ abstract class BaseYarnClusterSuite extraJars: Seq[String] = Nil, extraConf: Map[String, String] = Map(), extraEnv: Map[String, String] = Map()): SparkAppHandle.State = { - val master = if (clientMode) "yarn-client" else "yarn-cluster" + val deployMode = if (clientMode) "client" else "cluster" val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf) val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv @@ -148,7 +148,8 @@ abstract class BaseYarnClusterSuite launcher.setAppResource(fakeSparkJar.getAbsolutePath()) } launcher.setSparkHome(sys.props("spark.test.home")) - .setMaster(master) + .setMaster("yarn") + .setDeployMode(deployMode) .setConf("spark.executor.instances", "1") .setPropertiesFile(propsFile) .addAppArgs(appArgs.toArray: _*) http://git-wip-us.apache.org/repos/asf/spark/blob/e99d0170/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index b91c4be..60d35d3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -115,7 +115,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite { .setSparkHome(sys.props("spark.test.home")) .setConf("spark.ui.enabled", "false") .setPropertiesFile(propsFile) - .setMaster("yarn-client") + .setMaster("yarn") + .setDeployMode("client") .setAppResource("spark-internal") .setMainClass(mainClassName(YarnLauncherTestApp.getClass)) .startApplication() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org