Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/2350#discussion_r17396989
--- Diff:
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
---
@@ -35,28 +34,57 @@ class ClientArguments(val args: Array[String], val
sparkConf: SparkConf) {
var executorMemory = 1024 // MB
var executorCores = 1
var numExecutors = 2
- var amQueue = sparkConf.get("QUEUE", "default")
+ var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
var appName: String = "Spark"
var priority = 0
- parseArgs(args.toList)
+ // Additional memory to allocate to containers
+ // For now, use driver's memory overhead as our AM container's memory
overhead
+ val amMemoryOverhead = sparkConf.getInt(
+ "spark.yarn.driver.memoryOverhead",
YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+ val executorMemoryOverhead = sparkConf.getInt(
+ "spark.yarn.executor.memoryOverhead",
YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
- // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in
yarn-client then
- // it should default to hdfs://
- files =
Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
- archives =
Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
+ parseArgs(args.toList)
+ loadDefaultArgs()
+ validateArgs()
+
+ /** Load any default arguments provided through environment variables
and Spark properties. */
+ private def loadDefaultArgs(): Unit = {
+ // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should
be resolved to hdfs://,
+ // while spark.yarn.dist.{archives/files} should be resolved to
file:// (SPARK-2051).
+ files =
Option(files).orElse(sys.env.get("SPARK_YARN_DIST_FILES")).orNull
+ files = Option(files)
+ .orElse(sparkConf.getOption("spark.yarn.dist.files"))
+ .map(p => Utils.resolveURIs(p))
+ .orNull
+ archives =
Option(archives).orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")).orNull
+ archives = Option(archives)
+ .orElse(sparkConf.getOption("spark.yarn.dist.archives"))
+ .map(p => Utils.resolveURIs(p))
+ .orNull
+ }
- // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use
file:// if not specified,
- // for both yarn-client and yarn-cluster
- files =
Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
- map(p => Utils.resolveURIs(p)).orNull)
- archives =
Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
- map(p => Utils.resolveURIs(p)).orNull)
+ /**
+ * Fail fast if any arguments provided are invalid.
+ * This is intended to be called only after the provided arguments have
been parsed.
+ */
+ private def validateArgs(): Unit = {
+ Map[Boolean, String](
+ (numExecutors <= 0) -> "You must specify at least 1 executor!",
+ (amMemory <= amMemoryOverhead) -> s"AM memory must be >
$amMemoryOverhead MB",
+ (executorMemory <= executorMemoryOverhead) ->
+ s"Executor memory must be > $executorMemoryOverhead MB"
--- End diff --
@tgraves You had a few comments about a few of these checks being outdated
in a separate PR. Which ones should I remove?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]