asfgit closed pull request #23415: [SPARK-26445][CORE] Use ConfigEntry for
hardcoded configs for driver/executor categories.
URL: https://github.com/apache/spark/pull/23415
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 3f0b71bbe17f1..d966582295b37 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -127,7 +127,7 @@ private[spark] class ExecutorAllocationManager(
// allocation is only supported for YARN and the default number of cores per
executor in YARN is
// 1, but it might need to be attained differently for different cluster
managers
private val tasksPerExecutorForFullParallelism =
- conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
+ conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1)
private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
@@ -223,7 +223,7 @@ private[spark] class ExecutorAllocationManager(
"shuffle service. You may enable this through
spark.shuffle.service.enabled.")
}
if (tasksPerExecutorForFullParallelism == 0) {
- throw new SparkException("spark.executor.cores must not be <
spark.task.cpus.")
+ throw new SparkException(s"${EXECUTOR_CORES.key} must not be <
spark.task.cpus.")
}
if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0b47da12b5b42..681e4378a4dd5 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -503,12 +503,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable
with Logging with Seria
logWarning(msg)
}
- val executorOptsKey = "spark.executor.extraJavaOptions"
- val executorClasspathKey = "spark.executor.extraClassPath"
- val driverOptsKey = "spark.driver.extraJavaOptions"
- val driverClassPathKey = "spark.driver.extraClassPath"
- val driverLibraryPathKey = "spark.driver.extraLibraryPath"
- val sparkExecutorInstances = "spark.executor.instances"
+ val executorOptsKey = EXECUTOR_JAVA_OPTIONS.key
// Used by Yarn in 1.1 and before
sys.props.get("spark.driver.libraryPath").foreach { value =>
@@ -517,7 +512,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable
with Logging with Seria
|spark.driver.libraryPath was detected (set to '$value').
|This is deprecated in Spark 1.2+.
|
- |Please instead use: $driverLibraryPathKey
+ |Please instead use: ${DRIVER_LIBRARY_PATH.key}
""".stripMargin
logWarning(warning)
}
@@ -594,9 +589,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable
with Logging with Seria
}
}
- if (contains("spark.cores.max") && contains("spark.executor.cores")) {
- val totalCores = getInt("spark.cores.max", 1)
- val executorCores = getInt("spark.executor.cores", 1)
+ if (contains(CORES_MAX) && contains(EXECUTOR_CORES)) {
+ val totalCores = getInt(CORES_MAX.key, 1)
+ val executorCores = get(EXECUTOR_CORES)
val leftCores = totalCores % executorCores
if (leftCores != 0) {
logWarning(s"Total executor cores: ${totalCores} is not " +
@@ -605,12 +600,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable
with Logging with Seria
}
}
- if (contains("spark.executor.cores") && contains("spark.task.cpus")) {
- val executorCores = getInt("spark.executor.cores", 1)
+ if (contains(EXECUTOR_CORES) && contains("spark.task.cpus")) {
+ val executorCores = get(EXECUTOR_CORES)
val taskCpus = getInt("spark.task.cpus", 1)
if (executorCores < taskCpus) {
- throw new SparkException("spark.executor.cores must not be less than
spark.task.cpus.")
+ throw new SparkException(s"${EXECUTOR_CORES.key} must not be less than
spark.task.cpus.")
}
}
@@ -680,7 +675,7 @@ private[spark] object SparkConf extends Logging {
* TODO: consolidate it with `ConfigBuilder.withAlternative`.
*/
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
- "spark.executor.userClassPathFirst" -> Seq(
+ EXECUTOR_USER_CLASS_PATH_FIRST.key -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
UPDATE_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
@@ -703,7 +698,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
- "spark.executor.logs.rolling.maxSize" -> Seq(
+ EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq(
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
"spark.io.compression.snappy.blockSize" -> Seq(
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3475859c3ed69..89be9de083075 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -386,9 +386,9 @@ class SparkContext(config: SparkConf) extends Logging {
// Set Spark driver host and port system properties. This explicitly sets
the configuration
// instead of relying on the default value of the config constant.
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
- _conf.setIfMissing("spark.driver.port", "0")
+ _conf.setIfMissing(DRIVER_PORT, 0)
- _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
+ _conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER)
_jars = Utils.getUserJars(_conf)
_files =
_conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
@@ -461,7 +461,7 @@ class SparkContext(config: SparkConf) extends Logging {
files.foreach(addFile)
}
- _executorMemory = _conf.getOption("spark.executor.memory")
+ _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
@@ -2639,7 +2639,7 @@ object SparkContext extends Logging {
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) =>
convertToInt(threads)
case "yarn" =>
if (conf != null &&
conf.getOption("spark.submit.deployMode").contains("cluster")) {
- conf.getInt("spark.driver.cores", 0)
+ conf.getInt(DRIVER_CORES.key, 0)
} else {
0
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index de0c8579d9acc..9222781fa0833 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -163,10 +163,10 @@ object SparkEnv extends Logging {
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None):
SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
- assert(conf.contains("spark.driver.port"), "spark.driver.port is not set
on the driver!")
+ assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the
driver!")
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
- val port = conf.get("spark.driver.port").toInt
+ val port = conf.get(DRIVER_PORT)
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
@@ -251,7 +251,7 @@ object SparkEnv extends Logging {
// Figure out which port RpcEnv actually bound to in case the original
port is 0 or occupied.
if (isDriver) {
- conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ conf.set(DRIVER_PORT, rpcEnv.address.port)
}
// Create an instance of the class with the given name, possibly
initializing it with our conf
@@ -359,7 +359,7 @@ object SparkEnv extends Logging {
// We need to set the executor ID before the MetricsSystem is created
because sources and
// sinks specified in the metrics configuration file will want to
incorporate this executor's
// ID into the metrics they report.
- conf.set("spark.executor.id", executorId)
+ conf.set(EXECUTOR_ID, executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf,
securityManager)
ms.start()
ms
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 6b748c825d293..5168e9330965d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
import org.apache.spark._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.EXECUTOR_CORES
import org.apache.spark.internal.config.Python._
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util._
@@ -74,8 +75,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
// each python worker gets an equal part of the allocation. the worker pool
will grow to the
// number of concurrent tasks, which is determined by the number of cores in
this executor.
- private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
- .map(_ / conf.getInt("spark.executor.cores", 1))
+ private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ /
conf.get(EXECUTOR_CORES))
// All the Python functions should have the same exec, version and envvars.
protected val envVars = funcs.head.funcs.head.envVars
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index d5145094ec079..d94b174d8d868 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -27,7 +27,7 @@ import org.apache.log4j.Logger
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv,
ThreadSafeRpcEndpoint}
import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils}
@@ -68,17 +68,17 @@ private class ClientEndpoint(
// people call `addJar` assuming the jar is in the same
directory.
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
- val classPathConf = "spark.driver.extraClassPath"
+ val classPathConf = config.DRIVER_CLASS_PATH.key
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp
=>
cp.split(java.io.File.pathSeparator)
}
- val libraryPathConf = "spark.driver.extraLibraryPath"
+ val libraryPathConf = config.DRIVER_LIBRARY_PATH.key
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap
{ cp =>
cp.split(java.io.File.pathSeparator)
}
- val extraJavaOptsConf = "spark.driver.extraJavaOptions"
+ val extraJavaOptsConf = config.DRIVER_JAVA_OPTIONS.key
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index c6307da61c7eb..0679bdf7c7075 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -34,7 +34,7 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{ThreadUtils, Utils}
/**
@@ -77,7 +77,7 @@ private object FaultToleranceTest extends App with Logging {
private val containerSparkHome = "/opt/spark"
private val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
- System.setProperty("spark.driver.host", "172.17.42.1") // default docker
host ip
+ System.setProperty(config.DRIVER_HOST_ADDRESS.key, "172.17.42.1") // default
docker host ip
private def afterEach() {
if (sc != null) {
@@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging {
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
- System.setProperty("spark.driver.port", "0")
+ System.setProperty(config.DRIVER_PORT.key, "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance",
containerSparkHome)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 763bd0a70a035..a4c65aeaae3f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -514,13 +514,13 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey =
"spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey =
"spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
- confKey = "spark.driver.memory"),
+ confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS,
ALL_DEPLOY_MODES,
- confKey = "spark.driver.extraClassPath"),
+ confKey = DRIVER_CLASS_PATH.key),
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS,
ALL_DEPLOY_MODES,
- confKey = "spark.driver.extraJavaOptions"),
+ confKey = DRIVER_JAVA_OPTIONS.key),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS,
ALL_DEPLOY_MODES,
- confKey = "spark.driver.extraLibraryPath"),
+ confKey = DRIVER_LIBRARY_PATH.key),
OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = PRINCIPAL.key),
OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
@@ -537,7 +537,7 @@ private[spark] class SparkSubmit extends Logging {
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
- confKey = "spark.executor.instances"),
+ confKey = EXECUTOR_INSTANCES.key),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.files"),
@@ -545,22 +545,22 @@ private[spark] class SparkSubmit extends Logging {
// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES,
ALL_DEPLOY_MODES,
- confKey = "spark.executor.cores"),
+ confKey = EXECUTOR_CORES.key),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN |
KUBERNETES, ALL_DEPLOY_MODES,
- confKey = "spark.executor.memory"),
+ confKey = EXECUTOR_MEMORY.key),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES,
ALL_DEPLOY_MODES,
- confKey = "spark.cores.max"),
+ confKey = CORES_MAX.key),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES,
ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES,
ALL_DEPLOY_MODES,
confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN |
KUBERNETES, CLUSTER,
- confKey = "spark.driver.memory"),
+ confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES,
CLUSTER,
- confKey = "spark.driver.cores"),
+ confKey = DRIVER_CORES.key),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
- confKey = "spark.driver.supervise"),
+ confKey = DRIVER_SUPERVISE.key),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey =
"spark.jars.ivy"),
// An internal option used only for spark-shell to add user jars to
repl's classloader,
@@ -727,7 +727,7 @@ private[spark] class SparkSubmit extends Logging {
// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
- sparkConf.remove("spark.driver.host")
+ sparkConf.remove(DRIVER_HOST_ADDRESS)
}
// Resolve paths in certain spark properties
diff --git
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 4cf08a7980f55..34facd5a58c40 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -31,7 +31,7 @@ import scala.util.Try
import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
@@ -155,31 +155,31 @@ private[deploy] class SparkSubmitArguments(args:
Seq[String], env: Map[String, S
.orElse(env.get("MASTER"))
.orNull
driverExtraClassPath = Option(driverExtraClassPath)
- .orElse(sparkProperties.get("spark.driver.extraClassPath"))
+ .orElse(sparkProperties.get(config.DRIVER_CLASS_PATH.key))
.orNull
driverExtraJavaOptions = Option(driverExtraJavaOptions)
- .orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
+ .orElse(sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key))
.orNull
driverExtraLibraryPath = Option(driverExtraLibraryPath)
- .orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
+ .orElse(sparkProperties.get(config.DRIVER_LIBRARY_PATH.key))
.orNull
driverMemory = Option(driverMemory)
- .orElse(sparkProperties.get("spark.driver.memory"))
+ .orElse(sparkProperties.get(config.DRIVER_MEMORY.key))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
driverCores = Option(driverCores)
- .orElse(sparkProperties.get("spark.driver.cores"))
+ .orElse(sparkProperties.get(config.DRIVER_CORES.key))
.orNull
executorMemory = Option(executorMemory)
- .orElse(sparkProperties.get("spark.executor.memory"))
+ .orElse(sparkProperties.get(config.EXECUTOR_MEMORY.key))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
.orNull
executorCores = Option(executorCores)
- .orElse(sparkProperties.get("spark.executor.cores"))
+ .orElse(sparkProperties.get(config.EXECUTOR_CORES.key))
.orElse(env.get("SPARK_EXECUTOR_CORES"))
.orNull
totalExecutorCores = Option(totalExecutorCores)
- .orElse(sparkProperties.get("spark.cores.max"))
+ .orElse(sparkProperties.get(config.CORES_MAX.key))
.orNull
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
@@ -197,7 +197,7 @@ private[deploy] class SparkSubmitArguments(args:
Seq[String], env: Map[String, S
.orElse(env.get("DEPLOY_MODE"))
.orNull
numExecutors = Option(numExecutors)
- .getOrElse(sparkProperties.get("spark.executor.instances").orNull)
+ .getOrElse(sparkProperties.get(config.EXECUTOR_INSTANCES.key).orNull)
queue =
Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
keytab = Option(keytab)
.orElse(sparkProperties.get("spark.kerberos.keytab"))
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index afa1a5fbba792..c75e684df2264 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletResponse
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
+import org.apache.spark.internal.config
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
@@ -132,12 +133,12 @@ private[rest] class StandaloneSubmitRequestServlet(
// Optional fields
val sparkProperties = request.sparkProperties
- val driverMemory = sparkProperties.get("spark.driver.memory")
- val driverCores = sparkProperties.get("spark.driver.cores")
- val driverExtraJavaOptions =
sparkProperties.get("spark.driver.extraJavaOptions")
- val driverExtraClassPath =
sparkProperties.get("spark.driver.extraClassPath")
- val driverExtraLibraryPath =
sparkProperties.get("spark.driver.extraLibraryPath")
- val superviseDriver = sparkProperties.get("spark.driver.supervise")
+ val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
+ val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
+ val driverExtraJavaOptions =
sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
+ val driverExtraClassPath =
sparkProperties.get(config.DRIVER_CLASS_PATH.key)
+ val driverExtraLibraryPath =
sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
+ val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
// The semantics of "spark.master" and the masterUrl are different. While
the
// property "spark.master" could contain all registered masters, masterUrl
// contains only the active master. To make sure a Spark driver can recover
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
index 86ddf954ca128..7f462148c71a1 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest
import scala.util.Try
+import org.apache.spark.internal.config
import org.apache.spark.util.Utils
/**
@@ -49,11 +50,11 @@ private[rest] class CreateSubmissionRequest extends
SubmitRestProtocolRequest {
assertFieldIsSet(appArgs, "appArgs")
assertFieldIsSet(environmentVariables, "environmentVariables")
assertPropertyIsSet("spark.app.name")
- assertPropertyIsBoolean("spark.driver.supervise")
- assertPropertyIsNumeric("spark.driver.cores")
- assertPropertyIsNumeric("spark.cores.max")
- assertPropertyIsMemory("spark.driver.memory")
- assertPropertyIsMemory("spark.executor.memory")
+ assertPropertyIsBoolean(config.DRIVER_SUPERVISE.key)
+ assertPropertyIsNumeric(config.DRIVER_CORES.key)
+ assertPropertyIsNumeric(config.CORES_MAX.key)
+ assertPropertyIsMemory(config.DRIVER_MEMORY.key)
+ assertPropertyIsMemory(config.EXECUTOR_MEMORY.key)
}
private def assertPropertyIsSet(key: String): Unit =
diff --git
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 8d6a2b80ef5f2..1e8ad0b6af6a6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util._
@@ -43,7 +43,7 @@ object DriverWrapper extends Logging {
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val host: String = Utils.localHostName()
- val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt
+ val port: Int = sys.props.getOrElse(config.DRIVER_PORT.key, "0").toInt
val rpcEnv = RpcEnv.create("Driver", host, port, conf, new
SecurityManager(conf))
logInfo(s"Driver address: ${rpcEnv.address}")
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv,
workerUrl))
@@ -51,7 +51,7 @@ object DriverWrapper extends Logging {
val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
val loader =
- if (sys.props.getOrElse("spark.driver.userClassPathFirst",
"false").toBoolean) {
+ if (sys.props.getOrElse(config.DRIVER_USER_CLASS_PATH_FIRST.key,
"false").toBoolean) {
new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
} else {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index da8060459477f..8caaa73b02273 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -39,7 +39,12 @@ package object config {
private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
- private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
+ private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
+ .doc("Number of cores to use for the driver process, only in cluster
mode.")
+ .intConf
+ .createWithDefault(1)
+
+ private[spark] val DRIVER_MEMORY = ConfigBuilder(SparkLauncher.DRIVER_MEMORY)
.doc("Amount of memory to use for the driver process, in MiB unless
otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
@@ -113,6 +118,9 @@ package object config {
private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
+ private[spark] val EXECUTOR_ID =
+ ConfigBuilder("spark.executor.id").stringConf.createOptional
+
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
@@ -139,7 +147,11 @@ package object config {
private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)
- private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
+ private[spark] val EXECUTOR_CORES =
ConfigBuilder(SparkLauncher.EXECUTOR_CORES)
+ .intConf
+ .createWithDefault(1)
+
+ private[spark] val EXECUTOR_MEMORY =
ConfigBuilder(SparkLauncher.EXECUTOR_MEMORY)
.doc("Amount of memory to use per executor process, in MiB unless
otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
@@ -150,6 +162,15 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional
+ private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max")
+ .doc("When running on a standalone deploy cluster or a Mesos cluster in
coarse-grained " +
+ "sharing mode, the maximum amount of CPU cores to request for the
application from across " +
+ "the cluster (not from each machine). If not set, the default will be " +
+ "`spark.deploy.defaultCores` on Spark's standalone cluster manager, or
infinite " +
+ "(all available cores) on Mesos.")
+ .intConf
+ .createOptional
+
private[spark] val MEMORY_OFFHEAP_ENABLED =
ConfigBuilder("spark.memory.offHeap.enabled")
.doc("If true, Spark will attempt to use off-heap memory for certain
operations. " +
"If off-heap memory use is enabled, then spark.memory.offHeap.size must
be positive.")
@@ -347,6 +368,17 @@ package object config {
.stringConf
.createWithDefault(Utils.localCanonicalHostName())
+ private[spark] val DRIVER_PORT = ConfigBuilder("spark.driver.port")
+ .doc("Port of driver endpoints.")
+ .intConf
+ .createWithDefault(0)
+
+ private[spark] val DRIVER_SUPERVISE = ConfigBuilder("spark.driver.supervise")
+ .doc("If true, restarts the driver automatically if it fails with a
non-zero exit status. " +
+ "Only has effect in Spark standalone mode or Mesos cluster deploy mode.")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val DRIVER_BIND_ADDRESS =
ConfigBuilder("spark.driver.bindAddress")
.doc("Address where to bind network listen sockets on the driver.")
.fallbackConf(DRIVER_HOST_ADDRESS)
@@ -729,4 +761,23 @@ package object config {
.stringConf
.toSequence
.createWithDefault(Nil)
+
+ private[spark] val EXECUTOR_LOGS_ROLLING_STRATEGY =
+
ConfigBuilder("spark.executor.logs.rolling.strategy").stringConf.createWithDefault("")
+
+ private[spark] val EXECUTOR_LOGS_ROLLING_TIME_INTERVAL =
+
ConfigBuilder("spark.executor.logs.rolling.time.interval").stringConf.createWithDefault("daily")
+
+ private[spark] val EXECUTOR_LOGS_ROLLING_MAX_SIZE =
+ ConfigBuilder("spark.executor.logs.rolling.maxSize")
+ .stringConf
+ .createWithDefault((1024 * 1024).toString)
+
+ private[spark] val EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES =
+
ConfigBuilder("spark.executor.logs.rolling.maxRetainedFiles").intConf.createWithDefault(-1)
+
+ private[spark] val EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION =
+ ConfigBuilder("spark.executor.logs.rolling.enableCompression")
+ .booleanConf
+ .createWithDefault(false)
}
diff --git
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index a6f7db0600e60..8286087042741 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -18,6 +18,7 @@
package org.apache.spark.memory
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
import org.apache.spark.storage.BlockId
/**
@@ -127,14 +128,14 @@ private[spark] object StaticMemoryManager {
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must
" +
s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the
--driver-memory " +
- s"option or spark.driver.memory in Spark configuration.")
+ s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
}
- if (conf.contains("spark.executor.memory")) {
- val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
+ if (conf.contains(config.EXECUTOR_MEMORY)) {
+ val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
if (executorMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"Executor memory $executorMemory
must be at least " +
s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
- s"--executor-memory option or spark.executor.memory in Spark
configuration.")
+ s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark
configuration.")
}
}
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
diff --git
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 78edd2c4d7faa..9260fd3a6fb34 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -18,6 +18,7 @@
package org.apache.spark.memory
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
import org.apache.spark.storage.BlockId
/**
@@ -216,15 +217,15 @@ object UnifiedMemoryManager {
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the
--driver-memory " +
- s"option or spark.driver.memory in Spark configuration.")
+ s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
- if (conf.contains("spark.executor.memory")) {
- val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
+ if (conf.contains(config.EXECUTOR_MEMORY)) {
+ val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory
must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
- s"--executor-memory option or spark.executor.memory in Spark
configuration.")
+ s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark
configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 301317a79dfcf..b1e311ada4599 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -130,7 +130,7 @@ private[spark] class MetricsSystem private (
private[spark] def buildRegistryName(source: Source): String = {
val metricsNamespace =
conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
- val executorId = conf.getOption("spark.executor.id")
+ val executorId = conf.get(EXECUTOR_ID)
val defaultName = MetricRegistry.name(source.sourceName)
if (instance == "driver" || instance == "executor") {
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6bf60dd8e9dfa..41f032ccf82bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -717,7 +717,7 @@ private[spark] class TaskSetManager(
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks
" +
- s"(${Utils.bytesToString(totalResultSize)}) is bigger than
spark.driver.maxResultSize " +
+ s"(${Utils.bytesToString(totalResultSize)}) is bigger than
${config.MAX_RESULT_SIZE.key} " +
s"(${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index f73a58ff5d48c..adef20d3077d8 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -25,7 +25,7 @@ import scala.concurrent.Future
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient,
StandaloneAppClientListener}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
@@ -54,7 +54,7 @@ private[spark] class StandaloneSchedulerBackend(
private val registrationBarrier = new Semaphore(0)
- private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
+ private val maxCores = conf.get(config.CORES_MAX)
private val totalExpectedCores = maxCores.getOrElse(0)
override def start() {
@@ -69,8 +69,8 @@ private[spark] class StandaloneSchedulerBackend(
// The endpoint for executors to talk to us
val driverUrl = RpcEndpointAddress(
- sc.conf.get("spark.driver.host"),
- sc.conf.get("spark.driver.port").toInt,
+ sc.conf.get(config.DRIVER_HOST_ADDRESS),
+ sc.conf.get(config.DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val args = Seq(
"--driver-url", driverUrl,
@@ -79,11 +79,11 @@ private[spark] class StandaloneSchedulerBackend(
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
- val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
+ val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
- val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
+ val classPathEntries = sc.conf.get(config.EXECUTOR_CLASS_PATH)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
- val libraryPathEntries =
sc.conf.getOption("spark.executor.extraLibraryPath")
+ val libraryPathEntries = sc.conf.get(config.EXECUTOR_LIBRARY_PATH)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is
processed by
@@ -102,7 +102,7 @@ private[spark] class StandaloneSchedulerBackend(
val command =
Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath,
libraryPathEntries, javaOpts)
val webUrl = sc.ui.map(_.webUrl).getOrElse("")
- val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
+ val coresPerExecutor =
conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
// If we're using dynamic allocation, set our initial executor limit to 0
for now.
// ExecutorAllocationManager will send the real initial limit to the
Master later.
val initialExecutorLimit =
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index 0de57fbd5600c..6ff8bf29b006a 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv,
ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
@@ -116,7 +116,7 @@ private[spark] class LocalSchedulerBackend(
* @param conf Spark configuration.
*/
def getUserClasspath(conf: SparkConf): Seq[URL] = {
- val userClassPathStr = conf.getOption("spark.executor.extraClassPath")
+ val userClassPathStr = conf.get(config.EXECUTOR_CLASS_PATH)
userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new
File(_).toURI.toURL)
}
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index e5cccf39f9455..902e48fed3916 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
private[spark] object RpcUtils {
@@ -26,8 +27,8 @@ private[spark] object RpcUtils {
* Retrieve a `RpcEndpointRef` which is located in the driver via its name.
*/
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv):
RpcEndpointRef = {
- val driverHost: String = conf.get("spark.driver.host", "localhost")
- val driverPort: Int = conf.getInt("spark.driver.port", 7077)
+ val driverHost: String = conf.get(config.DRIVER_HOST_ADDRESS.key,
"localhost")
+ val driverPort: Int = conf.getInt(config.DRIVER_PORT.key, 7077)
Utils.checkHost(driverHost)
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 22f074cf98971..3527fee68939d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2231,7 +2231,7 @@ private[spark] object Utils extends Logging {
s"${e.getMessage}: Service$serviceString failed after " +
s"$maxRetries retries (on a random free port)! " +
s"Consider explicitly setting the appropriate binding address
for " +
- s"the service$serviceString (for example
spark.driver.bindAddress " +
+ s"the service$serviceString (for example
${DRIVER_BIND_ADDRESS.key} " +
s"for SparkDriver) to the correct binding address."
} else {
s"${e.getMessage}: Service$serviceString failed after " +
diff --git
a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 2f9ad4c8cc3e1..3188e0bd2b70d 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util.logging
import java.io.{File, FileOutputStream, InputStream, IOException}
import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{IntParam, Utils}
/**
@@ -115,11 +115,9 @@ private[spark] object FileAppender extends Logging {
/** Create the right appender based on Spark configuration */
def apply(inputStream: InputStream, file: File, conf: SparkConf):
FileAppender = {
- import RollingFileAppender._
-
- val rollingStrategy = conf.get(STRATEGY_PROPERTY, STRATEGY_DEFAULT)
- val rollingSizeBytes = conf.get(SIZE_PROPERTY, STRATEGY_DEFAULT)
- val rollingInterval = conf.get(INTERVAL_PROPERTY, INTERVAL_DEFAULT)
+ val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY)
+ val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE)
+ val rollingInterval = conf.get(config.EXECUTOR_LOGS_ROLLING_TIME_INTERVAL)
def createTimeBasedAppender(): FileAppender = {
val validatedParams: Option[(Long, String)] = rollingInterval match {
diff --git
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 5d8cec8447b53..59439b68792e5 100644
---
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -24,6 +24,7 @@ import com.google.common.io.Files
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
/**
* Continuously appends data from input stream into the given file, and rolls
@@ -44,10 +45,8 @@ private[spark] class RollingFileAppender(
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE
) extends FileAppender(inputStream, activeFile, bufferSize) {
- import RollingFileAppender._
-
- private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
- private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false)
+ private val maxRetainedFiles =
conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES)
+ private val enableCompression =
conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION)
/** Stop the appender */
override def stop() {
@@ -82,7 +81,7 @@ private[spark] class RollingFileAppender(
// Roll the log file and compress if enableCompression is true.
private def rotateFile(activeFile: File, rolloverFile: File): Unit = {
if (enableCompression) {
- val gzFile = new File(rolloverFile.getAbsolutePath + GZIP_LOG_SUFFIX)
+ val gzFile = new File(rolloverFile.getAbsolutePath +
RollingFileAppender.GZIP_LOG_SUFFIX)
var gzOutputStream: GZIPOutputStream = null
var inputStream: InputStream = null
try {
@@ -103,7 +102,7 @@ private[spark] class RollingFileAppender(
// Check if the rollover file already exists.
private def rolloverFileExist(file: File): Boolean = {
- file.exists || new File(file.getAbsolutePath + GZIP_LOG_SUFFIX).exists
+ file.exists || new File(file.getAbsolutePath +
RollingFileAppender.GZIP_LOG_SUFFIX).exists
}
/** Move the active log file to a new rollover file */
@@ -164,15 +163,7 @@ private[spark] class RollingFileAppender(
* names of configurations that configure rolling file appenders.
*/
private[spark] object RollingFileAppender {
- val STRATEGY_PROPERTY = "spark.executor.logs.rolling.strategy"
- val STRATEGY_DEFAULT = ""
- val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
- val INTERVAL_DEFAULT = "daily"
- val SIZE_PROPERTY = "spark.executor.logs.rolling.maxSize"
- val SIZE_DEFAULT = (1024 * 1024).toString
- val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
val DEFAULT_BUFFER_SIZE = 8192
- val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
val GZIP_LOG_SUFFIX = ".gz"
diff --git
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 5c718cb654ce8..d0389235cb724 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -155,7 +155,7 @@ class ExecutorAllocationManagerSuite
.set("spark.dynamicAllocation.maxExecutors", "15")
.set("spark.dynamicAllocation.minExecutors", "3")
.set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString)
- .set("spark.executor.cores", cores.toString)
+ .set(config.EXECUTOR_CORES, cores)
val sc = new SparkContext(conf)
contexts += sc
var manager = sc.executorAllocationManager.get
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e14a5dcb5ef84..9a6abbdb0a46f 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -140,7 +140,7 @@ class SparkConfSuite extends SparkFunSuite with
LocalSparkContext with ResetSyst
test("creating SparkContext with cpus per tasks bigger than cores per
executors") {
val conf = new SparkConf(false)
- .set("spark.executor.cores", "1")
+ .set(EXECUTOR_CORES, 1)
.set("spark.task.cpus", "2")
intercept[SparkException] { sc = new SparkContext(conf) }
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index a1d2a1283db14..8567dd1f08233 100644
---
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -243,7 +243,7 @@ class StandaloneDynamicAllocationSuite
}
test("dynamic allocation with cores per executor") {
- sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
+ sc = new SparkContext(appConf.set(config.EXECUTOR_CORES, 2))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
@@ -296,7 +296,7 @@ class StandaloneDynamicAllocationSuite
test("dynamic allocation with cores per executor AND max cores") {
sc = new SparkContext(appConf
- .set("spark.executor.cores", "2")
+ .set(config.EXECUTOR_CORES, 2)
.set("spark.cores.max", "8"))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
@@ -526,7 +526,7 @@ class StandaloneDynamicAllocationSuite
new SparkConf()
.setMaster(masterRpcEnv.address.toSparkURL)
.setAppName("test")
- .set("spark.executor.memory", "256m")
+ .set(config.EXECUTOR_MEMORY.key, "256m")
}
/** Make a master to which our application will send executor requests. */
diff --git
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index d56cfc183d921..5ce3453b682fe 100644
---
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -248,7 +248,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite
with PrivateMethodTes
val mm = UnifiedMemoryManager(conf, numCores = 1)
// Try using an executor memory that's too small
- val conf2 = conf.clone().set("spark.executor.memory", (reservedMemory /
2).toString)
+ val conf2 = conf.clone().set(EXECUTOR_MEMORY.key, (reservedMemory /
2).toString)
val exception = intercept[IllegalArgumentException] {
UnifiedMemoryManager(conf2, numCores = 1)
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index d264adaef90a5..f73ff67837c6d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -655,7 +655,7 @@ class TaskSetManagerSuite extends SparkFunSuite with
LocalSparkContext with Logg
}
test("abort the job if total size of results is too large") {
- val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
+ val conf = new SparkConf().set(config.MAX_RESULT_SIZE.key, "2m")
sc = new SparkContext("local", "test", conf)
def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) =>
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 3962bdc27d22c..19116cf22d2f8 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
+import org.apache.spark.internal.config.{DRIVER_PORT, MEMORY_OFFHEAP_SIZE}
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -86,7 +86,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.authenticate", "false")
- conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ conf.set(DRIVER_PORT, rpcEnv.address.port)
conf.set("spark.testing", "true")
conf.set("spark.memory.fraction", "1")
conf.set("spark.memory.storageFraction", "1")
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cf00c1c3aad39..e866342e4472c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -124,7 +124,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with BeforeAndAfterE
.set("spark.storage.unrollMemoryThreshold", "512")
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
- conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ conf.set(DRIVER_PORT, rpcEnv.address.port)
// Mock SparkContext to reduce the memory usage of tests. It's fine since
the only reason we
// need to create a SparkContext is to initialize LiveListenerBus.
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 52cd5378bc715..242163931f7ac 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -34,7 +34,7 @@ import org.mockito.Mockito.{atLeast, mock, verify}
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender,
SizeBasedRollingPolicy, TimeBasedRollingPolicy}
class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging
{
@@ -136,7 +136,7 @@ class FileAppenderSuite extends SparkFunSuite with
BeforeAndAfter with Logging {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
- val conf = new
SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10")
+ val conf = new
SparkConf().set(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES, 10)
val appender = new RollingFileAppender(testInputStream, testFile,
new SizeBasedRollingPolicy(1000, false), conf, 10)
@@ -200,13 +200,12 @@ class FileAppenderSuite extends SparkFunSuite with
BeforeAndAfter with Logging {
appender.awaitTermination()
}
- import RollingFileAppender._
-
def rollingStrategy(strategy: String): Seq[(String, String)] =
- Seq(STRATEGY_PROPERTY -> strategy)
- def rollingSize(size: String): Seq[(String, String)] = Seq(SIZE_PROPERTY
-> size)
+ Seq(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key -> strategy)
+ def rollingSize(size: String): Seq[(String, String)] =
+ Seq(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> size)
def rollingInterval(interval: String): Seq[(String, String)] =
- Seq(INTERVAL_PROPERTY -> interval)
+ Seq(config.EXECUTOR_LOGS_ROLLING_TIME_INTERVAL.key -> interval)
val msInDay = 24 * 60 * 60 * 1000L
val msInHour = 60 * 60 * 1000L
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 8362c14fb289d..d52988df58d66 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -42,7 +42,7 @@ private[spark] class BasicDriverFeatureStep(conf:
KubernetesDriverConf)
.getOrElse(throw new SparkException("Must specify the driver container
image"))
// CPU settings
- private val driverCpuCores = conf.get("spark.driver.cores", "1")
+ private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)
// Memory settings
@@ -85,7 +85,7 @@ private[spark] class BasicDriverFeatureStep(conf:
KubernetesDriverConf)
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
}
- val driverPort = conf.sparkConf.getInt("spark.driver.port",
DEFAULT_DRIVER_PORT)
+ val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key,
DEFAULT_DRIVER_PORT)
val driverBlockManagerPort = conf.sparkConf.getInt(
DRIVER_BLOCK_MANAGER_PORT.key,
DEFAULT_BLOCKMANAGER_PORT
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index dd73a5e52281c..6c3a6b39fa5cb 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -46,8 +46,8 @@ private[spark] class BasicExecutorFeatureStep(
private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix
private val driverUrl = RpcEndpointAddress(
- kubernetesConf.get("spark.driver.host"),
- kubernetesConf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+ kubernetesConf.get(DRIVER_HOST_ADDRESS),
+ kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
private val executorMemoryString = kubernetesConf.get(
@@ -67,7 +67,7 @@ private[spark] class BasicExecutorFeatureStep(
executorMemoryWithOverhead
}
- private val executorCores =
kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
+ private val executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES)
private val executorCoresRequest =
if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
index 42305457f4fff..15671179b18b3 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{HasMetadata,
ServiceBuilder}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{Clock, SystemClock}
private[spark] class DriverServiceFeatureStep(
@@ -51,18 +51,17 @@ private[spark] class DriverServiceFeatureStep(
}
private val driverPort = kubernetesConf.sparkConf.getInt(
- "spark.driver.port", DEFAULT_DRIVER_PORT)
+ config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt(
- org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key,
DEFAULT_BLOCKMANAGER_PORT)
+ config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
override def configurePod(pod: SparkPod): SparkPod = pod
override def getAdditionalPodSystemProperties(): Map[String, String] = {
val driverHostname =
s"$resolvedServiceName.${kubernetesConf.namespace}.svc"
Map(DRIVER_HOST_KEY -> driverHostname,
- "spark.driver.port" -> driverPort.toString,
- org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key ->
- driverBlockManagerPort.toString)
+ config.DRIVER_PORT.key -> driverPort.toString,
+ config.DRIVER_BLOCK_MANAGER_PORT.key -> driverBlockManagerPort.toString)
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
@@ -90,8 +89,8 @@ private[spark] class DriverServiceFeatureStep(
}
private[spark] object DriverServiceFeatureStep {
- val DRIVER_BIND_ADDRESS_KEY =
org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
- val DRIVER_HOST_KEY =
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
+ val DRIVER_BIND_ADDRESS_KEY = config.DRIVER_BIND_ADDRESS.key
+ val DRIVER_HOST_KEY = config.DRIVER_HOST_ADDRESS.key
val DRIVER_SVC_POSTFIX = "-driver-svc"
val MAX_SERVICE_NAME_LENGTH = 63
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 5ceb9d6d6fcd0..27d59dd7f3e5b 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -46,7 +46,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
test("Check the pod respects all configurations from the user.") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
- .set("spark.driver.cores", "2")
+ .set(DRIVER_CORES, 2)
.set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
.set(DRIVER_MEMORY.key, "256M")
.set(DRIVER_MEMORY_OVERHEAD, 200L)
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index e28c650a571ed..36bfb7d41ec39 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.{SecurityManager, SparkConf,
SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf,
KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.config._
+import org.apache.spark.internal.config
import org.apache.spark.internal.config.Python._
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -74,8 +74,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite
with BeforeAndAfter {
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX)
.set(CONTAINER_IMAGE, EXECUTOR_IMAGE)
.set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
- .set(DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME)
- .set("spark.driver.port", DRIVER_PORT.toString)
+ .set(config.DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME)
+ .set(config.DRIVER_PORT, DRIVER_PORT)
.set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
.set("spark.kubernetes.resource.type", "java")
}
@@ -125,8 +125,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite
with BeforeAndAfter {
}
test("classpath and extra java options get translated into environment
variables") {
- baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar")
- baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz")
+ baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
+ baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz")
val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
val step = new BasicExecutorFeatureStep(kconf, new
SecurityManager(baseConf))
val executor = step.configurePod(SparkPod.initialPod())
@@ -150,7 +150,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite
with BeforeAndAfter {
test("auth secret propagation") {
val conf = baseConf.clone()
- .set(NETWORK_AUTH_ENABLED, true)
+ .set(config.NETWORK_AUTH_ENABLED, true)
.set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf)
@@ -168,8 +168,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite
with BeforeAndAfter {
val secretFile = new File(secretDir, "secret-file.txt")
Files.write(secretFile.toPath,
"some-secret".getBytes(StandardCharsets.UTF_8))
val conf = baseConf.clone()
- .set(NETWORK_AUTH_ENABLED, true)
- .set(AUTH_SECRET_FILE, secretFile.getAbsolutePath)
+ .set(config.NETWORK_AUTH_ENABLED, true)
+ .set(config.AUTH_SECRET_FILE, secretFile.getAbsolutePath)
.set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf)
secMgr.initializeAuth()
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 045278939dfff..822f1e32968c2 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -39,7 +39,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
test("Headless service has a port for the driver RPC and the block
manager.") {
val sparkConf = new SparkConf(false)
- .set("spark.driver.port", "9000")
+ .set(DRIVER_PORT, 9000)
.set(DRIVER_BLOCK_MANAGER_PORT, 8080)
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
@@ -61,7 +61,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
test("Hostname and ports are set according to the service name.") {
val sparkConf = new SparkConf(false)
- .set("spark.driver.port", "9000")
+ .set(DRIVER_PORT, 9000)
.set(DRIVER_BLOCK_MANAGER_PORT, 8080)
.set(KUBERNETES_NAMESPACE, "my-namespace")
val kconf = KubernetesTestConf.createDriverConf(
@@ -87,7 +87,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
resolvedService)
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
- assert(additionalProps("spark.driver.port") ===
DEFAULT_DRIVER_PORT.toString)
+ assert(additionalProps(DRIVER_PORT.key) === DEFAULT_DRIVER_PORT.toString)
assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) ===
DEFAULT_BLOCKMANAGER_PORT.toString)
}
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index d134847dc74d2..dd0b2bad1ecb2 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -129,4 +129,7 @@ package object config {
"when launching drivers. Default is to accept all offers with
sufficient resources.")
.stringConf
.createWithDefault("")
+
+ private[spark] val EXECUTOR_URI =
+ ConfigBuilder("spark.executor.uri").stringConf.createOptional
}
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 68f6921153d89..a4aba3e9c0d05 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -27,6 +27,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion,
SparkConf}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest._
+import org.apache.spark.internal.config
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.util.Utils
@@ -92,12 +93,12 @@ private[mesos] class MesosSubmitRequestServlet(
// Optional fields
val sparkProperties = request.sparkProperties
- val driverExtraJavaOptions =
sparkProperties.get("spark.driver.extraJavaOptions")
- val driverExtraClassPath =
sparkProperties.get("spark.driver.extraClassPath")
- val driverExtraLibraryPath =
sparkProperties.get("spark.driver.extraLibraryPath")
- val superviseDriver = sparkProperties.get("spark.driver.supervise")
- val driverMemory = sparkProperties.get("spark.driver.memory")
- val driverCores = sparkProperties.get("spark.driver.cores")
+ val driverExtraJavaOptions =
sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
+ val driverExtraClassPath =
sparkProperties.get(config.DRIVER_CLASS_PATH.key)
+ val driverExtraLibraryPath =
sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
+ val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
+ val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
+ val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
// Construct driver description
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index cb1bcba651be6..021b1ac84805e 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -32,6 +32,7 @@ import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.deploy.rest.{CreateSubmissionResponse,
KillSubmissionResponse, SubmissionStatusResponse}
+import org.apache.spark.internal.config.{CORES_MAX, EXECUTOR_LIBRARY_PATH,
EXECUTOR_MEMORY}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
@@ -365,8 +366,7 @@ private[spark] class MesosClusterScheduler(
}
private def getDriverExecutorURI(desc: MesosDriverDescription):
Option[String] = {
- desc.conf.getOption("spark.executor.uri")
- .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+
desc.conf.get(config.EXECUTOR_URI).orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
@@ -474,7 +474,7 @@ private[spark] class MesosClusterScheduler(
} else if (executorUri.isDefined) {
val folderBasename = executorUri.get.split('/').last.split('.').head
- val entries = conf.getOption("spark.executor.extraLibraryPath")
+ val entries = conf.get(EXECUTOR_LIBRARY_PATH)
.map(path => Seq(path) ++ desc.command.libraryPathEntries)
.getOrElse(desc.command.libraryPathEntries)
@@ -528,10 +528,10 @@ private[spark] class MesosClusterScheduler(
options ++= Seq("--class", desc.command.mainClass)
}
- desc.conf.getOption("spark.executor.memory").foreach { v =>
+ desc.conf.getOption(EXECUTOR_MEMORY.key).foreach { v =>
options ++= Seq("--executor-memory", v)
}
- desc.conf.getOption("spark.cores.max").foreach { v =>
+ desc.conf.getOption(CORES_MAX.key).foreach { v =>
options ++= Seq("--total-executor-cores", v)
}
desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index f5866651dc90b..d0174516c2361 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -33,7 +33,6 @@ import org.apache.spark.{SecurityManager, SparkConf,
SparkContext, SparkExceptio
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
-import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -63,9 +62,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
- private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
+ private val maxCoresOption = conf.get(config.CORES_MAX)
- private val executorCoresOption =
conf.getOption("spark.executor.cores").map(_.toInt)
+ private val executorCoresOption =
conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
private val minCoresPerExecutor = executorCoresOption.getOrElse(1)
@@ -220,18 +219,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo
= {
val environment = Environment.newBuilder()
- val extraClassPath = conf.getOption("spark.executor.extraClassPath")
+ val extraClassPath = conf.get(config.EXECUTOR_CLASS_PATH)
extraClassPath.foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
}
- val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions").map {
+ val extraJavaOpts = conf.get(config.EXECUTOR_JAVA_OPTIONS).map {
Utils.substituteAppNExecIds(_, appId, taskId)
}.getOrElse("")
// Set the environment variable through a command prefix
// to append to the existing value of the variable
- val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p
=>
+ val prefixEnv = conf.get(config.EXECUTOR_LIBRARY_PATH).map { p =>
Utils.libraryPathEnvPrefix(Seq(p))
}.getOrElse("")
@@ -261,8 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val uri = conf.getOption("spark.executor.uri")
- .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+ val uri =
conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
if (uri.isEmpty) {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
@@ -304,8 +302,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
"driverURL"
} else {
RpcEndpointAddress(
- conf.get("spark.driver.host"),
- conf.get("spark.driver.port").toInt,
+ conf.get(config.DRIVER_HOST_ADDRESS),
+ conf.get(config.DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
}
}
@@ -633,7 +631,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout",
"120s")}s"),
- sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
+ sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
slave.shuffleRegistered = true
}
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 0bb6fe0fa4bdf..192f9407a1ba4 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -28,8 +28,9 @@ import org.apache.mesos.SchedulerDriver
import org.apache.mesos.protobuf.ByteString
import org.apache.spark.{SparkContext, SparkException, TaskState}
-import org.apache.spark.deploy.mesos.config
+import org.apache.spark.deploy.mesos.config.EXECUTOR_URI
import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.internal.config
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
@@ -107,15 +108,15 @@ private[spark] class MesosFineGrainedSchedulerBackend(
throw new SparkException("Executor Spark home
`spark.mesos.executor.home` is not set!")
}
val environment = Environment.newBuilder()
- sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
+ sc.conf.get(config.EXECUTOR_CLASS_PATH).foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
}
- val extraJavaOpts =
sc.conf.getOption("spark.executor.extraJavaOptions").map {
+ val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS).map {
Utils.substituteAppNExecIds(_, appId, execId)
}.getOrElse("")
- val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map {
p =>
+ val prefixEnv = sc.conf.get(config.EXECUTOR_LIBRARY_PATH).map { p =>
Utils.libraryPathEnvPrefix(Seq(p))
}.getOrElse("")
@@ -132,8 +133,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val uri = sc.conf.getOption("spark.executor.uri")
- .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+ val uri =
sc.conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
val executorBackendName = classOf[MesosExecutorBackend].getName
if (uri.isEmpty) {
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e46c4f970c4a3..8dbdac168f701 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -470,8 +470,8 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
- val host = userConf.get("spark.driver.host")
- val port = userConf.get("spark.driver.port").toInt
+ val host = userConf.get(DRIVER_HOST_ADDRESS)
+ val port = userConf.get(DRIVER_PORT)
registerAM(host, port, userConf, sc.ui.map(_.webUrl))
val driverRef = rpcEnv.setupEndpointRef(
@@ -505,7 +505,7 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
amCores, true)
// The client-mode AM doesn't listen for incoming connections, so report
an invalid port.
- registerAM(hostname, -1, sparkConf,
sparkConf.getOption("spark.driver.appUIAddress"))
+ registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS))
// The driver should be up and listening, so unlike cluster mode, just try
to connect to it
// with no waiting or retrying.
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index b257d8fdd3b1a..7e9cd409daf36 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -224,16 +224,12 @@ package object config {
/* Driver configuration. */
- private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
- .intConf
- .createWithDefault(1)
+ private[spark] val DRIVER_APP_UI_ADDRESS =
ConfigBuilder("spark.driver.appUIAddress")
+ .stringConf
+ .createOptional
/* Executor configuration. */
- private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
- .intConf
- .createWithDefault(1)
-
private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
.doc("Node label expression for executors.")
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 9397a1e3de9ac..167eef19ed856 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
@@ -42,10 +42,10 @@ private[spark] class YarnClientSchedulerBackend(
* This waits until the application is running.
*/
override def start() {
- val driverHost = conf.get("spark.driver.host")
- val driverPort = conf.get("spark.driver.port")
+ val driverHost = conf.get(config.DRIVER_HOST_ADDRESS)
+ val driverPort = conf.get(config.DRIVER_PORT)
val hostport = driverHost + ":" + driverPort
- sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }
+ sc.ui.foreach { ui => conf.set(DRIVER_APP_UI_ADDRESS, ui.webUrl) }
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport)
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 8032213602c95..9e3cc6ec01dfd 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY,
EXECUTOR_CORES, EXECUTOR_MEMORY}
class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]