Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3696#discussion_r21844488
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---
    @@ -110,4 +113,165 @@ class ExecutorRunnable(
         nmClient.startContainer(container, ctx)
       }
     
    +  private def prepareCommand(
    +      masterAddress: String,
    +      slaveId: String,
    +      hostname: String,
    +      executorMemory: Int,
    +      executorCores: Int,
    +      appId: String,
    +      localResources: HashMap[String, LocalResource]): List[String] = {
    +    // Extra options for the JVM
    +    val javaOpts = ListBuffer[String]()
    +
    +    // Set the environment variable through a command prefix
    +    // to append to the existing value of the variable
    +    var prefixEnv: Option[String] = None
    +
    +    // Set the JVM memory
    +    val executorMemoryString = executorMemory + "m"
    +    javaOpts += "-Xms" + executorMemoryString + " -Xmx" + 
executorMemoryString + " "
    +
    +    // Set extra Java options for the executor, if defined
    +    sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
    +      javaOpts += opts
    +    }
    +    sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
    +      javaOpts += opts
    +    }
    +    sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
    +      prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
    +    }
    +
    +    javaOpts += "-Djava.io.tmpdir=" +
    +      new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
    +
    +    // Certain configs need to be passed here because they are needed 
before the Executor
    +    // registers with the Scheduler and transfers the spark configs. Since 
the Executor backend
    +    // uses Akka to connect to the scheduler, the akka settings are needed 
as well as the
    +    // authentication settings.
    +    sparkConf.getAll.
    +      filter { case (k, v) => k.startsWith("spark.auth") || 
k.startsWith("spark.akka") }.
    +      foreach { case (k, v) => javaOpts += 
YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
    +
    +    sparkConf.getAkkaConf.
    +      foreach { case (k, v) => javaOpts += 
YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
    +
    +    // Commenting it out for now - so that people can refer to the 
properties if required. Remove
    +    // it once cpuset version is pushed out.
    +    // The context is, default gc for server class machines end up using 
all cores to do gc - hence
    +    // if there are multiple containers in same node, spark gc effects all 
other containers
    +    // performance (which can also be other spark containers)
    +    // Instead of using this, rely on cpusets by YARN to enforce spark 
behaves 'properly' in
    +    // multi-tenant environments. Not sure how default java gc behaves if 
it is limited to subset
    +    // of cores on a node.
    +    /*
    +        else {
    +          // If no java_opts specified, default to using 
-XX:+CMSIncrementalMode
    +          // It might be possible that other modes/config is being done in
    +          // spark.executor.extraJavaOptions, so we dont want to mess with 
it.
    +          // In our expts, using (default) throughput collector has severe 
perf ramnifications in
    +          // multi-tennent machines
    +          // The options are based on
    +          // 
http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use
    +          // %20the%20Concurrent%20Low%20Pause%20Collector|outline
    +          javaOpts += " -XX:+UseConcMarkSweepGC "
    +          javaOpts += " -XX:+CMSIncrementalMode "
    +          javaOpts += " -XX:+CMSIncrementalPacing "
    +          javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
    +          javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
    +        }
    +    */
    +
    +    // For log4j configuration to reference
    +    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR)
    +
    +    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + 
"/bin/java",
    +      "-server",
    +      // Kill if OOM is raised - leverage yarn's failure handling to cause 
rescheduling.
    +      // Not killing the task leaves various aspects of the executor and 
(to some extent) the jvm in
    +      // an inconsistent state.
    +      // TODO: If the OOM is not recoverable by rescheduling it on 
different node, then do
    +      // 'something' to fail job ... akin to blacklisting trackers in 
mapred ?
    +      "-XX:OnOutOfMemoryError='kill %p'") ++
    +      javaOpts ++
    +      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
    +        masterAddress.toString,
    +        slaveId.toString,
    +        hostname.toString,
    +        executorCores.toString,
    +        appId,
    +        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
    +        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
    +
    +    // TODO: it would be nicer to just make sure there are no null 
commands here
    +    commands.map(s => if (s == null) "null" else s).toList
    +  }
    +
    +  private def setupDistributedCache(
    +                                     file: String,
    --- End diff --
    
    Indentation is wrong here. tabs vs. spaces?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to