[SPARK-25875][K8S] Merge code to set up driver command into a single step. Right now there are 3 different classes dealing with building the driver command to run inside the pod, one for each "binding" supported by Spark. This has two main shortcomings:
- the code in the 3 classes is very similar; changing things in one place would probably mean making a similar change in the others. - it gives the false impression that the step implementation is the only place where binding-specific logic is needed. That is not true; there was code in KubernetesConf that was binding-specific, and there's also code in the executor-specific config step. So the 3 classes weren't really working as a language-specific abstraction. On top of that, the current code was propagating command line parameters in a different way depending on the binding. That doesn't seem necessary, and in fact using environment variables for command line parameters is in general a really bad idea, since you can't handle special characters (e.g. spaces) that way. This change merges the 3 different code paths for Java, Python and R into a single step, and also merges the 3 code paths to start the Spark driver in the k8s entry point script. This increases the amount of shared code, and also moves more feature logic into the step itself, so it doesn't live in KubernetesConf. Note that not all logic related to setting up the driver lives in that step. For example, the memory overhead calculation still lives separately, except it now happens in the driver config step instead of outside the step hierarchy altogether. Some of the noise in the diff is because of changes to KubernetesConf, which will be addressed in a separate change. Tested with new and updated unit tests + integration tests. Author: Marcelo Vanzin <van...@cloudera.com> Closes #22897 from vanzin/SPARK-25875. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3404a73f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3404a73f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3404a73f Branch: refs/heads/master Commit: 3404a73f4cf7be37e574026d08ad5cf82cfac871 Parents: 7ea594e Author: Marcelo Vanzin <van...@cloudera.com> Authored: Fri Nov 2 13:58:08 2018 -0700 Committer: mcheah <mch...@palantir.com> Committed: Fri Nov 2 13:58:08 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/k8s/Config.scala | 30 +--- .../org/apache/spark/deploy/k8s/Constants.scala | 10 +- .../spark/deploy/k8s/KubernetesConf.scala | 70 ++------- .../k8s/features/BasicDriverFeatureStep.scala | 42 ++++-- .../k8s/features/BasicExecutorFeatureStep.scala | 21 +-- .../k8s/features/DriverCommandFeatureStep.scala | 134 +++++++++++++++++ .../features/KubernetesFeatureConfigStep.scala | 4 +- .../k8s/features/PodTemplateConfigMapStep.scala | 4 +- .../bindings/JavaDriverFeatureStep.scala | 46 ------ .../bindings/PythonDriverFeatureStep.scala | 76 ---------- .../features/bindings/RDriverFeatureStep.scala | 62 -------- .../submit/KubernetesClientApplication.scala | 10 +- .../k8s/submit/KubernetesDriverBuilder.scala | 26 +--- .../deploy/k8s/submit/MainAppResource.scala | 3 +- .../spark/deploy/k8s/KubernetesConfSuite.scala | 103 +------------ .../features/BasicDriverFeatureStepSuite.scala | 69 +++++++-- .../BasicExecutorFeatureStepSuite.scala | 4 - .../DriverCommandFeatureStepSuite.scala | 144 +++++++++++++++++++ ...rKubernetesCredentialsFeatureStepSuite.scala | 3 - .../DriverServiceFeatureStepSuite.scala | 19 +-- .../features/EnvSecretsFeatureStepSuite.scala | 1 - .../features/LocalDirsFeatureStepSuite.scala | 4 +- .../features/MountSecretsFeatureStepSuite.scala | 1 - .../features/MountVolumesFeatureStepSuite.scala | 4 +- .../PodTemplateConfigMapStepSuite.scala | 4 +- .../bindings/JavaDriverFeatureStepSuite.scala | 61 -------- .../bindings/PythonDriverFeatureStepSuite.scala | 112 --------------- .../bindings/RDriverFeatureStepSuite.scala | 64 --------- .../spark/deploy/k8s/submit/ClientSuite.scala | 4 +- .../submit/KubernetesDriverBuilderSuite.scala | 136 +++--------------- .../k8s/KubernetesExecutorBuilderSuite.scala | 6 - .../src/main/dockerfiles/spark/entrypoint.sh | 16 --- 32 files changed, 438 insertions(+), 855 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 862f1d6..a32bd93 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s import java.util.concurrent.TimeUnit +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigBuilder @@ -125,34 +126,6 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE = - ConfigBuilder("spark.kubernetes.python.mainAppResource") - .doc("The main app resource for pyspark jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_PYSPARK_APP_ARGS = - ConfigBuilder("spark.kubernetes.python.appArgs") - .doc("The app arguments for PySpark Jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_R_MAIN_APP_RESOURCE = - ConfigBuilder("spark.kubernetes.r.mainAppResource") - .doc("The main app resource for SparkR jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_R_APP_ARGS = - ConfigBuilder("spark.kubernetes.r.appArgs") - .doc("The app arguments for SparkR Jobs") - .internal() - .stringConf - .createOptional - val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") @@ -267,6 +240,7 @@ private[spark] object Config extends Logging { .doc("This sets the resource type internally") .internal() .stringConf + .checkValues(Set(APP_RESOURCE_TYPE_JAVA, APP_RESOURCE_TYPE_PYTHON, APP_RESOURCE_TYPE_R)) .createOptional val KUBERNETES_LOCAL_DIRS_TMPFS = http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 1c6d53c..85917b8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -69,12 +69,8 @@ private[spark] object Constants { val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" // BINDINGS - val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" val ENV_PYSPARK_FILES = "PYSPARK_FILES" - val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS" val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION" - val ENV_R_PRIMARY = "R_PRIMARY" - val ENV_R_ARGS = "R_APP_ARGS" // Pod spec templates val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml" @@ -88,6 +84,7 @@ private[spark] object Constants { val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val MEMORY_OVERHEAD_MIN_MIB = 384L + val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d // Hadoop Configuration val HADOOP_FILE_VOLUME = "hadoop-properties" @@ -113,4 +110,9 @@ private[spark] object Constants { // Hadoop credentials secrets for the Spark app. val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret" + + // Application resource types. + val APP_RESOURCE_TYPE_JAVA = "java" + val APP_RESOURCE_TYPE_PYTHON = "python" + val APP_RESOURCE_TYPE_R = "r" } http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 066547d..ebb8154 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManag import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.util.Utils private[spark] sealed trait KubernetesRoleSpecificConf @@ -36,10 +37,15 @@ private[spark] sealed trait KubernetesRoleSpecificConf * Structure containing metadata for Kubernetes logic that builds a Spark driver. */ private[spark] case class KubernetesDriverSpecificConf( - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, appName: String, - appArgs: Seq[String]) extends KubernetesRoleSpecificConf + appArgs: Seq[String], + pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf { + + require(mainAppResource != null, "Main resource must be provided.") + +} /* * Structure containing metadata for Kubernetes logic that builds a Spark executor. @@ -70,7 +76,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( roleSecretEnvNamesToKeyRefs: Map[String, String], roleEnvs: Map[String, String], roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]], - sparkFiles: Seq[String], hadoopConfSpec: Option[HadoopConfSpec]) { def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config" @@ -82,23 +87,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) - def sparkJars(): Seq[String] = sparkConf - .getOption("spark.jars") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) - - def pyFiles(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_PY_FILES) - - def pySparkMainResource(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE) - - def pySparkPythonVersion(): String = sparkConf - .get(PYSPARK_MAJOR_PYTHON_VERSION) - - def sparkRMainResource(): Option[String] = sparkConf - .get(KUBERNETES_R_MAIN_APP_RESOURCE) - def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) def imagePullSecrets(): Seq[LocalObjectReference] = { @@ -130,38 +118,11 @@ private[spark] object KubernetesConf { appName: String, appResourceNamePrefix: String, appId: String, - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, appArgs: Array[String], maybePyFiles: Option[String], hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { - val sparkConfWithMainAppJar = sparkConf.clone() - val additionalFiles = mutable.ArrayBuffer.empty[String] - mainAppResource.foreach { - case JavaMainAppResource(res) => - val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) - if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) - } - // The function of this outer match is to account for multiple nonJVM - // bindings that will all have increased default MEMORY_OVERHEAD_FACTOR to 0.4 - case nonJVM: NonJVMResource => - nonJVM match { - case PythonMainAppResource(res) => - additionalFiles += res - maybePyFiles.foreach{maybePyFiles => - additionalFiles.appendAll(maybePyFiles.split(","))} - sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) - case RMainAppResource(res) => - additionalFiles += res - sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res) - } - sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) - } - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + @@ -188,11 +149,6 @@ private[spark] object KubernetesConf { KubernetesVolumeUtils.parseVolumesWithPrefix( sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) - val sparkFiles = sparkConf - .getOption("spark.files") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) ++ additionalFiles - val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) KubernetesUtils.requireNandDefined( hadoopConfDir, @@ -205,10 +161,12 @@ private[spark] object KubernetesConf { } else { None } + val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) + KubernetesConf( - sparkConfWithMainAppJar, - KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs), + sparkConf.clone(), + KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles), appResourceNamePrefix, appId, driverLabels, @@ -217,7 +175,6 @@ private[spark] object KubernetesConf { driverSecretEnvNamesToKeyRefs, driverEnvs, driverVolumes, - sparkFiles, hadoopConfSpec) } @@ -274,7 +231,6 @@ private[spark] object KubernetesConf { executorEnvSecrets, executorEnv, executorVolumes, - Seq.empty[String], None) } } http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 96b14a0..5ddf73c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils private[spark] class BasicDriverFeatureStep( conf: KubernetesConf[KubernetesDriverSpecificConf]) @@ -47,10 +48,23 @@ private[spark] class BasicDriverFeatureStep( // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) + + // The memory overhead factor to use. If the user has not set it, then use a different + // value for non-JVM apps. This value is propagated to executors. + private val overheadFactor = + if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) { + if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) { + conf.get(MEMORY_OVERHEAD_FACTOR) + } else { + NON_JVM_MEMORY_OVERHEAD_FACTOR + } + } else { + conf.get(MEMORY_OVERHEAD_FACTOR) + } + private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) - .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) + .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { @@ -134,20 +148,18 @@ private[spark] class BasicDriverFeatureStep( KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix, - KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true") - - val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkJars()) - val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkFiles) - if (resolvedSparkJars.nonEmpty) { - additionalProps.put("spark.jars", resolvedSparkJars.mkString(",")) - } - if (resolvedSparkFiles.nonEmpty) { - additionalProps.put("spark.files", resolvedSparkFiles.mkString(",")) + KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", + MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) + + Seq("spark.jars", "spark.files").foreach { key => + conf.getOption(key).foreach { value => + val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value)) + if (resolved.nonEmpty) { + additionalProps.put(key, resolved.mkString(",")) + } + } } + additionalProps.toMap } - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 1dab2a8..7f397e6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -58,16 +58,13 @@ private[spark] class BasicExecutorFeatureStep( (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB - private val executorMemoryTotal = kubernetesConf.sparkConf - .getOption(APP_RESOURCE_TYPE.key).map{ res => - val additionalPySparkMemory = res match { - case "python" => - kubernetesConf.sparkConf - .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) - case _ => 0 - } - executorMemoryWithOverhead + additionalPySparkMemory - }.getOrElse(executorMemoryWithOverhead) + private val executorMemoryTotal = + if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)) { + executorMemoryWithOverhead + + kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + } else { + executorMemoryWithOverhead + } private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1) private val executorCoresRequest = @@ -187,8 +184,4 @@ private[spark] class BasicExecutorFeatureStep( SparkPod(executorPod, containerWithLimitCores) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala new file mode 100644 index 0000000..8b8f0d0 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder} + +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.Utils + +/** + * Creates the driver command for running the user app, and propagates needed configuration so + * executors can also find the app code. + */ +private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + + private val driverConf = conf.roleSpecificConf + + override def configurePod(pod: SparkPod): SparkPod = { + driverConf.mainAppResource match { + case JavaMainAppResource(_) => + configureForJava(pod) + + case PythonMainAppResource(res) => + configureForPython(pod, res) + + case RMainAppResource(res) => + configureForR(pod, res) + } + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = { + driverConf.mainAppResource match { + case JavaMainAppResource(res) => + res.map(additionalJavaProperties).getOrElse(Map.empty) + + case PythonMainAppResource(res) => + additionalPythonProperties(res) + + case RMainAppResource(res) => + additionalRProperties(res) + } + } + + private def configureForJava(pod: SparkPod): SparkPod = { + // The user application jar is merged into the spark.jars list and managed through that + // property, so use a "blank" resource for the Java driver. + val driverContainer = baseDriverContainer(pod, SparkLauncher.NO_RESOURCE).build() + SparkPod(pod.pod, driverContainer) + } + + private def configureForPython(pod: SparkPod, res: String): SparkPod = { + val maybePythonFiles = if (driverConf.pyFiles.nonEmpty) { + // Delineation by ":" is to append the PySpark Files to the PYTHONPATH + // of the respective PySpark pod + val resolved = KubernetesUtils.resolveFileUrisAndPath(driverConf.pyFiles) + Some(new EnvVarBuilder() + .withName(ENV_PYSPARK_FILES) + .withValue(resolved.mkString(":")) + .build()) + } else { + None + } + val pythonEnvs = + Seq(new EnvVarBuilder() + .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) + .withValue(conf.sparkConf.get(PYSPARK_MAJOR_PYTHON_VERSION)) + .build()) ++ + maybePythonFiles + + val pythonContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)) + .addAllToEnv(pythonEnvs.asJava) + .build() + + SparkPod(pod.pod, pythonContainer) + } + + private def configureForR(pod: SparkPod, res: String): SparkPod = { + val rContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)).build() + SparkPod(pod.pod, rContainer) + } + + private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = { + new ContainerBuilder(pod.container) + .addToArgs("driver") + .addToArgs("--properties-file", SPARK_CONF_PATH) + .addToArgs("--class", driverConf.mainClass) + .addToArgs(resource) + .addToArgs(driverConf.appArgs: _*) + } + + private def additionalJavaProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", Seq(resource)) + } + + private def additionalPythonProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_PYTHON) ++ + mergeFileList("spark.files", Seq(resource) ++ driverConf.pyFiles) + } + + private def additionalRProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", Seq(resource)) + } + + private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = { + val existing = Utils.stringToSeq(conf.sparkConf.get(key, "")) + Map(key -> (existing ++ filesToAdd).distinct.mkString(",")) + } + + private def resourceType(resType: String): Map[String, String] = { + Map(APP_RESOURCE_TYPE.key -> resType) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala index 4c1be3b..58cdaa3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala @@ -61,11 +61,11 @@ private[spark] trait KubernetesFeatureConfigStep { /** * Return any system properties that should be set on the JVM in accordance to this feature. */ - def getAdditionalPodSystemProperties(): Map[String, String] + def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty /** * Return any additional Kubernetes resources that should be added to support this feature. Only * applicable when creating the driver in cluster mode. */ - def getAdditionalKubernetesResources(): Seq[HasMetadata] + def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala index 96a8013..28e2d17 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -54,11 +54,11 @@ private[spark] class PodTemplateConfigMapStep( SparkPod(podWithVolume, containerWithVolume) } - def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( + override def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) - def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala deleted file mode 100644 index 6f063b2..0000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep -import org.apache.spark.launcher.SparkLauncher - -private[spark] class JavaDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val withDriverArgs = new ContainerBuilder(pod.container) - .addToArgs("driver") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*) - .build() - SparkPod(pod.pod, withDriverArgs) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "java") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala deleted file mode 100644 index cf0c03b..0000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep - -private[spark] class PythonDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val roleConf = kubernetesConf.roleSpecificConf - require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") - // Delineation is done by " " because that is input into PythonRunner - val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( - pyArgs => - new EnvVarBuilder() - .withName(ENV_PYSPARK_ARGS) - .withValue(pyArgs.mkString(" ")) - .build()) - val maybePythonFiles = kubernetesConf.pyFiles().map( - // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH - // of the respective PySpark pod - pyFiles => - new EnvVarBuilder() - .withName(ENV_PYSPARK_FILES) - .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(",")) - .mkString(":")) - .build()) - val envSeq = - Seq(new EnvVarBuilder() - .withName(ENV_PYSPARK_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get)) - .build(), - new EnvVarBuilder() - .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) - .withValue(kubernetesConf.pySparkPythonVersion()) - .build()) - val pythonEnvs = envSeq ++ - maybePythonArgs.toSeq ++ - maybePythonFiles.toSeq - - val withPythonPrimaryContainer = new ContainerBuilder(pod.container) - .addAllToEnv(pythonEnvs.asJava) - .addToArgs("driver-py") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", roleConf.mainClass) - .build() - - SparkPod(pod.pod, withPythonPrimaryContainer) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "python") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala deleted file mode 100644 index 1a7ef52..0000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep - -private[spark] class RDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val roleConf = kubernetesConf.roleSpecificConf - require(roleConf.mainAppResource.isDefined, "R Main Resource must be defined") - // Delineation is done by " " because that is input into RRunner - val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( - rArgs => - new EnvVarBuilder() - .withName(ENV_R_ARGS) - .withValue(rArgs.mkString(" ")) - .build()) - val envSeq = - Seq(new EnvVarBuilder() - .withName(ENV_R_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get)) - .build()) - val rEnvs = envSeq ++ - maybeRArgs.toSeq - - val withRPrimaryContainer = new ContainerBuilder(pod.container) - .addAllToEnv(rEnvs.asJava) - .addToArgs("driver-r") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", roleConf.mainClass) - .build() - - SparkPod(pod.pod, withRPrimaryContainer) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "r") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 4b58f8b..543d6b1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils * @param maybePyFiles additional Python files via --py-files */ private[spark] case class ClientArguments( - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, driverArgs: Array[String], maybePyFiles: Option[String], @@ -53,18 +53,18 @@ private[spark] case class ClientArguments( private[spark] object ClientArguments { def fromCommandLineArgs(args: Array[String]): ClientArguments = { - var mainAppResource: Option[MainAppResource] = None + var mainAppResource: MainAppResource = JavaMainAppResource(None) var mainClass: Option[String] = None val driverArgs = mutable.ArrayBuffer.empty[String] var maybePyFiles : Option[String] = None args.sliding(2, 2).toList.foreach { case Array("--primary-java-resource", primaryJavaResource: String) => - mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + mainAppResource = JavaMainAppResource(Some(primaryJavaResource)) case Array("--primary-py-file", primaryPythonResource: String) => - mainAppResource = Some(PythonMainAppResource(primaryPythonResource)) + mainAppResource = PythonMainAppResource(primaryPythonResource) case Array("--primary-r-file", primaryRFile: String) => - mainAppResource = Some(RMainAppResource(primaryRFile)) + mainAppResource = RMainAppResource(primaryRFile) case Array("--other-py-files", pyFiles: String) => maybePyFiles = Some(pyFiles) case Array("--main-class", clazz: String) => http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 5565cd7..be4daec 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} private[spark] class KubernetesDriverBuilder( provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = @@ -45,18 +44,10 @@ private[spark] class KubernetesDriverBuilder( provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountVolumesFeatureStep) = new MountVolumesFeatureStep(_), - providePythonStep: ( + provideDriverCommandStep: ( KubernetesConf[KubernetesDriverSpecificConf] - => PythonDriverFeatureStep) = - new PythonDriverFeatureStep(_), - provideRStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => RDriverFeatureStep) = - new RDriverFeatureStep(_), - provideJavaStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => JavaDriverFeatureStep) = - new JavaDriverFeatureStep(_), + => DriverCommandFeatureStep) = + new DriverCommandFeatureStep(_), provideHadoopGlobalStep: ( KubernetesConf[KubernetesDriverSpecificConf] => KerberosConfDriverFeatureStep) = @@ -88,21 +79,14 @@ private[spark] class KubernetesDriverBuilder( Seq(providePodTemplateConfigMapStep(kubernetesConf)) } else Nil - val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { - case JavaMainAppResource(_) => - provideJavaStep(kubernetesConf) - case PythonMainAppResource(_) => - providePythonStep(kubernetesConf) - case RMainAppResource(_) => - provideRStep(kubernetesConf)} - .getOrElse(provideJavaStep(kubernetesConf)) + val driverCommandStep = provideDriverCommandStep(kubernetesConf) val maybeHadoopConfigStep = kubernetesConf.hadoopConfSpec.map { _ => provideHadoopGlobalStep(kubernetesConf)} val allFeatures: Seq[KubernetesFeatureConfigStep] = - (baseFeatures :+ bindingsStep) ++ + baseFeatures ++ Seq(driverCommandStep) ++ secretFeature ++ envSecretFeature ++ volumesFeature ++ maybeHadoopConfigStep.toSeq ++ podTemplateFeature http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala index dd5a454..a2e01fa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala @@ -20,7 +20,8 @@ private[spark] sealed trait MainAppResource private[spark] sealed trait NonJVMResource -private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource +private[spark] case class JavaMainAppResource(primaryResource: Option[String]) + extends MainAppResource private[spark] case class PythonMainAppResource(primaryResource: String) extends MainAppResource with NonJVMResource http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index bb2b94f..41ca8d1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -56,7 +56,7 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - mainAppResource = None, + mainAppResource = JavaMainAppResource(None), MAIN_CLASS, APP_ARGS, maybePyFiles = None, @@ -65,109 +65,10 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap) assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX) assert(conf.roleSpecificConf.appName === APP_NAME) - assert(conf.roleSpecificConf.mainAppResource.isEmpty) assert(conf.roleSpecificConf.mainClass === MAIN_CLASS) assert(conf.roleSpecificConf.appArgs === APP_ARGS) } - test("Creating driver conf with and without the main app jar influences spark.jars") { - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - val mainAppJar = Some(JavaMainAppResource("local:///opt/spark/main.jar")) - val kubernetesConfWithMainJar = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppJar, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars") - .split(",") - === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar")) - val kubernetesConfWithoutMainJar = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource = None, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) - } - - test("Creating driver conf with a python primary file") { - val mainResourceFile = "local:///opt/spark/main.py" - val inputPyFiles = Array("local:///opt/spark/example2.py", "local:///example3.py") - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - .set("spark.files", "local:///opt/spark/example4.py") - val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) - val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - Some(inputPyFiles.mkString(",")), - hadoopConfDir = None) - assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) - assert(kubernetesConfWithMainResource.sparkFiles - === Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles) - } - - test("Creating driver conf with a r primary file") { - val mainResourceFile = "local:///opt/spark/main.R" - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - .set("spark.files", "local:///opt/spark/example2.R") - val mainAppResource = Some(RMainAppResource(mainResourceFile)) - val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) - assert(kubernetesConfWithMainResource.sparkFiles - === Array("local:///opt/spark/example2.R", mainResourceFile)) - } - - test("Testing explicit setting of memory overhead on non-JVM tasks") { - val sparkConf = new SparkConf(false) - .set(MEMORY_OVERHEAD_FACTOR, 0.3) - - val mainResourceFile = "local:///opt/spark/main.py" - val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) - val conf = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3) - } - test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") { val sparkConf = new SparkConf(false) .set(MEMORY_OVERHEAD_FACTOR, 0.3) @@ -192,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - mainAppResource = None, + mainAppResource = JavaMainAppResource(None), MAIN_CLASS, APP_ARGS, maybePyFiles = None, http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 5c6bcc7..1e7dfbe 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -52,7 +52,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { new LocalObjectReferenceBuilder().withName(secret).build() } private val emptyDriverSpecificConf = KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), APP_NAME, MAIN_CLASS, APP_ARGS) @@ -62,8 +62,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") .set("spark.driver.cores", "2") .set(KUBERNETES_DRIVER_LIMIT_CORES, "4") - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M") - .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L) + .set(DRIVER_MEMORY.key, "256M") + .set(DRIVER_MEMORY_OVERHEAD, 200L) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(",")) val kubernetesConf = KubernetesConf( @@ -77,7 +77,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val featureStep = new BasicDriverFeatureStep(kubernetesConf) @@ -130,21 +129,22 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> APP_ID, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, - "spark.kubernetes.submitInDriver" -> "true") + "spark.kubernetes.submitInDriver" -> "true", + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } test("Check appropriate entrypoint rerouting for various bindings") { val javaSparkConf = new SparkConf() - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver:latest") val pythonSparkConf = new SparkConf() - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver-py:latest") val javaKubernetesConf = KubernetesConf( javaSparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("")), + JavaMainAppResource(None), APP_NAME, PY_MAIN_CLASS, APP_ARGS), @@ -156,13 +156,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val pythonKubernetesConf = KubernetesConf( pythonSparkConf, KubernetesDriverSpecificConf( - Some(PythonMainAppResource("")), + PythonMainAppResource(""), APP_NAME, PY_MAIN_CLASS, APP_ARGS), @@ -174,7 +173,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf) val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf) @@ -204,7 +202,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - allFiles, hadoopConfSpec = None) val step = new BasicDriverFeatureStep(kubernetesConf) @@ -215,10 +212,52 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, "spark.kubernetes.submitInDriver" -> "true", "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", - "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") + "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt", + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(additionalProperties === expectedSparkConf) } + // Memory overhead tests. Tuples are: + // test name, main resource, overhead factor, expected factor + Seq( + ("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get), + ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), + ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d), + ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) + ).foreach { case (name, resource, factor, expectedFactor) => + test(s"memory overhead factor: $name") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) } + val driverConf = emptyDriverSpecificConf.copy(mainAppResource = resource) + val conf = KubernetesConf( + sparkConf, + driverConf, + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + Map.empty, + DRIVER_ENVS, + Nil, + hadoopConfSpec = None) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = pod.container.getResources.getRequests.get("memory").getAmount() + val expected = (driverMem + driverMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + } + } + def containerPort(name: String, portNumber: Int): ContainerPort = new ContainerPortBuilder() .withName(name) http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 41f34bd..e9a16aa 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -91,7 +91,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) @@ -132,7 +131,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } @@ -154,7 +152,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map("qux" -> "quux"), Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) @@ -182,7 +179,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala new file mode 100644 index 0000000..3067295 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.util.Utils + +class DriverCommandFeatureStepSuite extends SparkFunSuite { + + private val MAIN_CLASS = "mainClass" + + test("java resource") { + val mainResource = "local:///main.jar" + val spec = applyFeatureStep( + JavaMainAppResource(Some(mainResource)), + appArgs = Array("5", "7")) + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "spark-internal", "5", "7")) + + val jars = Utils.stringToSeq(spec.systemProperties("spark.jars")) + assert(jars.toSet === Set(mainResource)) + } + + test("python resource with no extra files") { + val mainResource = "local:///main.py" + val sparkConf = new SparkConf(false) + .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") + + val spec = applyFeatureStep( + PythonMainAppResource(mainResource), + conf = sparkConf) + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.py")) + val envs = spec.pod.container.getEnv.asScala + .map { env => (env.getName, env.getValue) } + .toMap + assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3") + + val files = Utils.stringToSeq(spec.systemProperties("spark.files")) + assert(files.toSet === Set(mainResource)) + } + + test("python resource with extra files") { + val expectedMainResource = "/main.py" + val expectedPySparkFiles = "/example2.py:/example3.py" + val filesInConf = Set("local:///example.py") + + val mainResource = s"local://$expectedMainResource" + val pyFiles = Seq("local:///example2.py", "local:///example3.py") + + val sparkConf = new SparkConf(false) + .set("spark.files", filesInConf.mkString(",")) + .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") + val spec = applyFeatureStep( + PythonMainAppResource(mainResource), + conf = sparkConf, + appArgs = Array("5", "7", "9"), + pyFiles = pyFiles) + + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.py", "5", "7", "9")) + + val envs = spec.pod.container.getEnv.asScala + .map { env => (env.getName, env.getValue) } + .toMap + val expected = Map( + ENV_PYSPARK_FILES -> expectedPySparkFiles, + ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2") + assert(envs === expected) + + val files = Utils.stringToSeq(spec.systemProperties("spark.files")) + assert(files.toSet === pyFiles.toSet ++ filesInConf ++ Set(mainResource)) + } + + test("R resource") { + val expectedMainResource = "/main.R" + val mainResource = s"local://$expectedMainResource" + + val spec = applyFeatureStep( + RMainAppResource(mainResource), + appArgs = Array("5", "7", "9")) + + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.R", "5", "7", "9")) + } + + private def applyFeatureStep( + resource: MainAppResource, + conf: SparkConf = new SparkConf(false), + appArgs: Array[String] = Array(), + pyFiles: Seq[String] = Nil): KubernetesDriverSpec = { + val driverConf = new KubernetesDriverSpecificConf( + resource, MAIN_CLASS, "appName", appArgs, pyFiles = pyFiles) + val kubernetesConf = KubernetesConf( + conf, + driverConf, + "resource-prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + hadoopConfSpec = None) + val step = new DriverCommandFeatureStep(kubernetesConf) + val pod = step.configurePod(SparkPod.initialPod()) + val props = step.getAdditionalPodSystemProperties() + KubernetesDriverSpec(pod, Nil, props) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index 8675ceb..36c6616 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -62,7 +62,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) @@ -95,7 +94,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) @@ -135,7 +133,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 5c3e801..3c46667 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.util.Clock class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { @@ -59,7 +60,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -68,7 +69,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) @@ -92,7 +92,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) .set(KUBERNETES_NAMESPACE, "my-namespace"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -101,7 +101,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX @@ -115,7 +114,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -124,7 +123,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val resolvedService = configurationStep .getAdditionalKubernetesResources() @@ -147,7 +145,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -156,7 +154,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) val driverService = configurationStep @@ -176,7 +173,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -185,7 +182,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) fail("The driver bind address should not be allowed.") @@ -203,7 +199,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -212,7 +208,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) fail("The driver host address should not be allowed.") http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index 43796b7..3d25307 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -46,7 +46,6 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{ envVarsToKeys, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val step = new EnvSecretsFeatureStep(kubernetesConf) http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 3a4e605..894d824 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" @@ -36,7 +37,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -48,7 +49,6 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) } http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index 18e3d77..1555f6a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -44,7 +44,6 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val step = new MountSecretsFeatureStep(kubernetesConf) http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 0d0a5fb..2a95746 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -18,13 +18,14 @@ package org.apache.spark.deploy.k8s.features import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class MountVolumesFeatureStepSuite extends SparkFunSuite { private val sparkConf = new SparkConf(false) private val emptyKubernetesConf = KubernetesConf( sparkConf = sparkConf, roleSpecificConf = KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -36,7 +37,6 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Nil, hadoopConfSpec = None) test("Mounts hostPath volumes") { http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala index d7bbbd1..370948c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { private var sparkConf: SparkConf = _ @@ -36,7 +37,7 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -48,7 +49,6 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) templateFile = Files.createTempFile("pod-template", "yml").toFile templateFile.deleteOnExit() http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala deleted file mode 100644 index 9172e0c..0000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource - -class JavaDriverFeatureStepSuite extends SparkFunSuite { - - test("Java Step modifies container correctly") { - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.jar")), - "test-class", - "java-runner", - Seq("5 7")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new JavaDriverFeatureStep(kubernetesConf) - val driverPod = step.configurePod(baseDriverPod).pod - val driverContainerwithJavaStep = step.configurePod(baseDriverPod).container - assert(driverContainerwithJavaStep.getArgs.size === 7) - val args = driverContainerwithJavaStep - .getArgs.asScala - assert(args === List( - "driver", - "--properties-file", SPARK_CONF_PATH, - "--class", "test-class", - "spark-internal", "5 7")) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org