[SPARK-25875][K8S] Merge code to set up driver command into a single step.

Right now there are 3 different classes dealing with building the driver
command to run inside the pod, one for each "binding" supported by Spark.
This has two main shortcomings:

- the code in the 3 classes is very similar; changing things in one place
  would probably mean making a similar change in the others.

- it gives the false impression that the step implementation is the only
  place where binding-specific logic is needed. That is not true; there
  was code in KubernetesConf that was binding-specific, and there's also
  code in the executor-specific config step. So the 3 classes weren't really
  working as a language-specific abstraction.

On top of that, the current code was propagating command line parameters in
a different way depending on the binding. That doesn't seem necessary, and
in fact using environment variables for command line parameters is in general
a really bad idea, since you can't handle special characters (e.g. spaces)
that way.

This change merges the 3 different code paths for Java, Python and R into
a single step, and also merges the 3 code paths to start the Spark driver
in the k8s entry point script. This increases the amount of shared code,
and also moves more feature logic into the step itself, so it doesn't live
in KubernetesConf.

Note that not all logic related to setting up the driver lives in that
step. For example, the memory overhead calculation still lives separately,
except it now happens in the driver config step instead of outside the
step hierarchy altogether.

Some of the noise in the diff is because of changes to KubernetesConf, which
will be addressed in a separate change.

Tested with new and updated unit tests + integration tests.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #22897 from vanzin/SPARK-25875.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3404a73f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3404a73f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3404a73f

Branch: refs/heads/master
Commit: 3404a73f4cf7be37e574026d08ad5cf82cfac871
Parents: 7ea594e
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri Nov 2 13:58:08 2018 -0700
Committer: mcheah <mch...@palantir.com>
Committed: Fri Nov 2 13:58:08 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/k8s/Config.scala    |  30 +---
 .../org/apache/spark/deploy/k8s/Constants.scala |  10 +-
 .../spark/deploy/k8s/KubernetesConf.scala       |  70 ++-------
 .../k8s/features/BasicDriverFeatureStep.scala   |  42 ++++--
 .../k8s/features/BasicExecutorFeatureStep.scala |  21 +--
 .../k8s/features/DriverCommandFeatureStep.scala | 134 +++++++++++++++++
 .../features/KubernetesFeatureConfigStep.scala  |   4 +-
 .../k8s/features/PodTemplateConfigMapStep.scala |   4 +-
 .../bindings/JavaDriverFeatureStep.scala        |  46 ------
 .../bindings/PythonDriverFeatureStep.scala      |  76 ----------
 .../features/bindings/RDriverFeatureStep.scala  |  62 --------
 .../submit/KubernetesClientApplication.scala    |  10 +-
 .../k8s/submit/KubernetesDriverBuilder.scala    |  26 +---
 .../deploy/k8s/submit/MainAppResource.scala     |   3 +-
 .../spark/deploy/k8s/KubernetesConfSuite.scala  | 103 +------------
 .../features/BasicDriverFeatureStepSuite.scala  |  69 +++++++--
 .../BasicExecutorFeatureStepSuite.scala         |   4 -
 .../DriverCommandFeatureStepSuite.scala         | 144 +++++++++++++++++++
 ...rKubernetesCredentialsFeatureStepSuite.scala |   3 -
 .../DriverServiceFeatureStepSuite.scala         |  19 +--
 .../features/EnvSecretsFeatureStepSuite.scala   |   1 -
 .../features/LocalDirsFeatureStepSuite.scala    |   4 +-
 .../features/MountSecretsFeatureStepSuite.scala |   1 -
 .../features/MountVolumesFeatureStepSuite.scala |   4 +-
 .../PodTemplateConfigMapStepSuite.scala         |   4 +-
 .../bindings/JavaDriverFeatureStepSuite.scala   |  61 --------
 .../bindings/PythonDriverFeatureStepSuite.scala | 112 ---------------
 .../bindings/RDriverFeatureStepSuite.scala      |  64 ---------
 .../spark/deploy/k8s/submit/ClientSuite.scala   |   4 +-
 .../submit/KubernetesDriverBuilderSuite.scala   | 136 +++---------------
 .../k8s/KubernetesExecutorBuilderSuite.scala    |   6 -
 .../src/main/dockerfiles/spark/entrypoint.sh    |  16 ---
 32 files changed, 438 insertions(+), 855 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 862f1d6..a32bd93 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.ConfigBuilder
 
@@ -125,34 +126,6 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
-  val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
-    ConfigBuilder("spark.kubernetes.python.mainAppResource")
-      .doc("The main app resource for pyspark jobs")
-      .internal()
-      .stringConf
-      .createOptional
-
-  val KUBERNETES_PYSPARK_APP_ARGS =
-    ConfigBuilder("spark.kubernetes.python.appArgs")
-      .doc("The app arguments for PySpark Jobs")
-      .internal()
-      .stringConf
-      .createOptional
-
-  val KUBERNETES_R_MAIN_APP_RESOURCE =
-    ConfigBuilder("spark.kubernetes.r.mainAppResource")
-      .doc("The main app resource for SparkR jobs")
-      .internal()
-      .stringConf
-      .createOptional
-
-  val KUBERNETES_R_APP_ARGS =
-    ConfigBuilder("spark.kubernetes.r.appArgs")
-      .doc("The app arguments for SparkR Jobs")
-      .internal()
-      .stringConf
-      .createOptional
-
   val KUBERNETES_ALLOCATION_BATCH_SIZE =
     ConfigBuilder("spark.kubernetes.allocation.batch.size")
       .doc("Number of pods to launch at once in each round of executor 
allocation.")
@@ -267,6 +240,7 @@ private[spark] object Config extends Logging {
       .doc("This sets the resource type internally")
       .internal()
       .stringConf
+      .checkValues(Set(APP_RESOURCE_TYPE_JAVA, APP_RESOURCE_TYPE_PYTHON, 
APP_RESOURCE_TYPE_R))
       .createOptional
 
   val KUBERNETES_LOCAL_DIRS_TMPFS =

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 1c6d53c..85917b8 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -69,12 +69,8 @@ private[spark] object Constants {
   val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
 
   // BINDINGS
-  val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
   val ENV_PYSPARK_FILES = "PYSPARK_FILES"
-  val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
   val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
-  val ENV_R_PRIMARY = "R_PRIMARY"
-  val ENV_R_ARGS = "R_APP_ARGS"
 
   // Pod spec templates
   val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
@@ -88,6 +84,7 @@ private[spark] object Constants {
   val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
   val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
   val MEMORY_OVERHEAD_MIN_MIB = 384L
+  val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d
 
   // Hadoop Configuration
   val HADOOP_FILE_VOLUME = "hadoop-properties"
@@ -113,4 +110,9 @@ private[spark] object Constants {
   // Hadoop credentials secrets for the Spark app.
   val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
   val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret"
+
+  // Application resource types.
+  val APP_RESOURCE_TYPE_JAVA = "java"
+  val APP_RESOURCE_TYPE_PYTHON = "python"
+  val APP_RESOURCE_TYPE_R = "r"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 066547d..ebb8154 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManag
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
 import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.util.Utils
 
 
 private[spark] sealed trait KubernetesRoleSpecificConf
@@ -36,10 +37,15 @@ private[spark] sealed trait KubernetesRoleSpecificConf
  * Structure containing metadata for Kubernetes logic that builds a Spark 
driver.
  */
 private[spark] case class KubernetesDriverSpecificConf(
-    mainAppResource: Option[MainAppResource],
+    mainAppResource: MainAppResource,
     mainClass: String,
     appName: String,
-    appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+    appArgs: Seq[String],
+    pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf {
+
+  require(mainAppResource != null, "Main resource must be provided.")
+
+}
 
 /*
  * Structure containing metadata for Kubernetes logic that builds a Spark 
executor.
@@ -70,7 +76,6 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
     roleSecretEnvNamesToKeyRefs: Map[String, String],
     roleEnvs: Map[String, String],
     roleVolumes: Iterable[KubernetesVolumeSpec[_ <: 
KubernetesVolumeSpecificConf]],
-    sparkFiles: Seq[String],
     hadoopConfSpec: Option[HadoopConfSpec]) {
 
   def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config"
@@ -82,23 +87,6 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
 
   def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
 
-  def sparkJars(): Seq[String] = sparkConf
-    .getOption("spark.jars")
-    .map(str => str.split(",").toSeq)
-    .getOrElse(Seq.empty[String])
-
-  def pyFiles(): Option[String] = sparkConf
-    .get(KUBERNETES_PYSPARK_PY_FILES)
-
-  def pySparkMainResource(): Option[String] = sparkConf
-    .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)
-
-  def pySparkPythonVersion(): String = sparkConf
-      .get(PYSPARK_MAJOR_PYTHON_VERSION)
-
-  def sparkRMainResource(): Option[String] = sparkConf
-    .get(KUBERNETES_R_MAIN_APP_RESOURCE)
-
   def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
 
   def imagePullSecrets(): Seq[LocalObjectReference] = {
@@ -130,38 +118,11 @@ private[spark] object KubernetesConf {
       appName: String,
       appResourceNamePrefix: String,
       appId: String,
-      mainAppResource: Option[MainAppResource],
+      mainAppResource: MainAppResource,
       mainClass: String,
       appArgs: Array[String],
       maybePyFiles: Option[String],
       hadoopConfDir: Option[String]): 
KubernetesConf[KubernetesDriverSpecificConf] = {
-    val sparkConfWithMainAppJar = sparkConf.clone()
-    val additionalFiles = mutable.ArrayBuffer.empty[String]
-    mainAppResource.foreach {
-        case JavaMainAppResource(res) =>
-          val previousJars = sparkConf
-            .getOption("spark.jars")
-            .map(_.split(","))
-            .getOrElse(Array.empty)
-          if (!previousJars.contains(res)) {
-            sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
-          }
-        // The function of this outer match is to account for multiple nonJVM
-        // bindings that will all have increased default 
MEMORY_OVERHEAD_FACTOR to 0.4
-        case nonJVM: NonJVMResource =>
-          nonJVM match {
-            case PythonMainAppResource(res) =>
-              additionalFiles += res
-              maybePyFiles.foreach{maybePyFiles =>
-                additionalFiles.appendAll(maybePyFiles.split(","))}
-              
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
-            case RMainAppResource(res) =>
-              additionalFiles += res
-              sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res)
-          }
-          sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
-    }
-
     val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
       sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
     require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key 
" +
@@ -188,11 +149,6 @@ private[spark] object KubernetesConf {
     KubernetesVolumeUtils.parseVolumesWithPrefix(
       sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
 
-    val sparkFiles = sparkConf
-      .getOption("spark.files")
-      .map(str => str.split(",").toSeq)
-      .getOrElse(Seq.empty[String]) ++ additionalFiles
-
     val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
     KubernetesUtils.requireNandDefined(
       hadoopConfDir,
@@ -205,10 +161,12 @@ private[spark] object KubernetesConf {
       } else {
         None
       }
+    val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
+
 
     KubernetesConf(
-      sparkConfWithMainAppJar,
-      KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, 
appArgs),
+      sparkConf.clone(),
+      KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, 
appArgs, pyFiles),
       appResourceNamePrefix,
       appId,
       driverLabels,
@@ -217,7 +175,6 @@ private[spark] object KubernetesConf {
       driverSecretEnvNamesToKeyRefs,
       driverEnvs,
       driverVolumes,
-      sparkFiles,
       hadoopConfSpec)
   }
 
@@ -274,7 +231,6 @@ private[spark] object KubernetesConf {
       executorEnvSecrets,
       executorEnv,
       executorVolumes,
-      Seq.empty[String],
       None)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 96b14a0..5ddf73c 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
 import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
 
 private[spark] class BasicDriverFeatureStep(
     conf: KubernetesConf[KubernetesDriverSpecificConf])
@@ -47,10 +48,23 @@ private[spark] class BasicDriverFeatureStep(
 
   // Memory settings
   private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
+
+  // The memory overhead factor to use. If the user has not set it, then use a 
different
+  // value for non-JVM apps. This value is propagated to executors.
+  private val overheadFactor =
+    if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) {
+      if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) {
+        conf.get(MEMORY_OVERHEAD_FACTOR)
+      } else {
+        NON_JVM_MEMORY_OVERHEAD_FACTOR
+      }
+    } else {
+      conf.get(MEMORY_OVERHEAD_FACTOR)
+    }
+
   private val memoryOverheadMiB = conf
     .get(DRIVER_MEMORY_OVERHEAD)
-    .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * 
driverMemoryMiB).toInt,
-      MEMORY_OVERHEAD_MIN_MIB))
+    .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, 
MEMORY_OVERHEAD_MIN_MIB))
   private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
 
   override def configurePod(pod: SparkPod): SparkPod = {
@@ -134,20 +148,18 @@ private[spark] class BasicDriverFeatureStep(
       KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
       "spark.app.id" -> conf.appId,
       KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix,
-      KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true")
-
-    val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
-      conf.sparkJars())
-    val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
-      conf.sparkFiles)
-    if (resolvedSparkJars.nonEmpty) {
-      additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
-    }
-    if (resolvedSparkFiles.nonEmpty) {
-      additionalProps.put("spark.files", resolvedSparkFiles.mkString(","))
+      KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
+      MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
+
+    Seq("spark.jars", "spark.files").foreach { key =>
+      conf.getOption(key).foreach { value =>
+        val resolved = 
KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value))
+        if (resolved.nonEmpty) {
+          additionalProps.put(key, resolved.mkString(","))
+        }
+      }
     }
+
     additionalProps.toMap
   }
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 1dab2a8..7f397e6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -58,16 +58,13 @@ private[spark] class BasicExecutorFeatureStep(
       (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
       MEMORY_OVERHEAD_MIN_MIB))
   private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
-  private val executorMemoryTotal = kubernetesConf.sparkConf
-    .getOption(APP_RESOURCE_TYPE.key).map{ res =>
-      val additionalPySparkMemory = res match {
-        case "python" =>
-          kubernetesConf.sparkConf
-            .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
-        case _ => 0
-      }
-    executorMemoryWithOverhead + additionalPySparkMemory
-  }.getOrElse(executorMemoryWithOverhead)
+  private val executorMemoryTotal =
+    if (kubernetesConf.get(APP_RESOURCE_TYPE) == 
Some(APP_RESOURCE_TYPE_PYTHON)) {
+      executorMemoryWithOverhead +
+        kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
+    } else {
+      executorMemoryWithOverhead
+    }
 
   private val executorCores = 
kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
   private val executorCoresRequest =
@@ -187,8 +184,4 @@ private[spark] class BasicExecutorFeatureStep(
 
     SparkPod(executorPod, containerWithLimitCores)
   }
-
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
new file mode 100644
index 0000000..8b8f0d0
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder}
+
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.util.Utils
+
+/**
+ * Creates the driver command for running the user app, and propagates needed 
configuration so
+ * executors can also find the app code.
+ */
+private[spark] class DriverCommandFeatureStep(conf: 
KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  private val driverConf = conf.roleSpecificConf
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    driverConf.mainAppResource match {
+      case JavaMainAppResource(_) =>
+        configureForJava(pod)
+
+      case PythonMainAppResource(res) =>
+        configureForPython(pod, res)
+
+      case RMainAppResource(res) =>
+        configureForR(pod, res)
+    }
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    driverConf.mainAppResource match {
+      case JavaMainAppResource(res) =>
+        res.map(additionalJavaProperties).getOrElse(Map.empty)
+
+      case PythonMainAppResource(res) =>
+        additionalPythonProperties(res)
+
+      case RMainAppResource(res) =>
+        additionalRProperties(res)
+    }
+  }
+
+  private def configureForJava(pod: SparkPod): SparkPod = {
+    // The user application jar is merged into the spark.jars list and managed 
through that
+    // property, so use a "blank" resource for the Java driver.
+    val driverContainer = baseDriverContainer(pod, 
SparkLauncher.NO_RESOURCE).build()
+    SparkPod(pod.pod, driverContainer)
+  }
+
+  private def configureForPython(pod: SparkPod, res: String): SparkPod = {
+    val maybePythonFiles = if (driverConf.pyFiles.nonEmpty) {
+      // Delineation by ":" is to append the PySpark Files to the PYTHONPATH
+      // of the respective PySpark pod
+      val resolved = KubernetesUtils.resolveFileUrisAndPath(driverConf.pyFiles)
+      Some(new EnvVarBuilder()
+        .withName(ENV_PYSPARK_FILES)
+        .withValue(resolved.mkString(":"))
+        .build())
+    } else {
+      None
+    }
+    val pythonEnvs =
+      Seq(new EnvVarBuilder()
+          .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
+          .withValue(conf.sparkConf.get(PYSPARK_MAJOR_PYTHON_VERSION))
+        .build()) ++
+      maybePythonFiles
+
+    val pythonContainer = baseDriverContainer(pod, 
KubernetesUtils.resolveFileUri(res))
+      .addAllToEnv(pythonEnvs.asJava)
+      .build()
+
+    SparkPod(pod.pod, pythonContainer)
+  }
+
+  private def configureForR(pod: SparkPod, res: String): SparkPod = {
+    val rContainer = baseDriverContainer(pod, 
KubernetesUtils.resolveFileUri(res)).build()
+    SparkPod(pod.pod, rContainer)
+  }
+
+  private def baseDriverContainer(pod: SparkPod, resource: String): 
ContainerBuilder = {
+    new ContainerBuilder(pod.container)
+      .addToArgs("driver")
+      .addToArgs("--properties-file", SPARK_CONF_PATH)
+      .addToArgs("--class", driverConf.mainClass)
+      .addToArgs(resource)
+      .addToArgs(driverConf.appArgs: _*)
+  }
+
+  private def additionalJavaProperties(resource: String): Map[String, String] 
= {
+    resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", 
Seq(resource))
+  }
+
+  private def additionalPythonProperties(resource: String): Map[String, 
String] = {
+    resourceType(APP_RESOURCE_TYPE_PYTHON) ++
+      mergeFileList("spark.files", Seq(resource) ++ driverConf.pyFiles)
+  }
+
+  private def additionalRProperties(resource: String): Map[String, String] = {
+    resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", 
Seq(resource))
+  }
+
+  private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, 
String] = {
+    val existing = Utils.stringToSeq(conf.sparkConf.get(key, ""))
+    Map(key -> (existing ++ filesToAdd).distinct.mkString(","))
+  }
+
+  private def resourceType(resType: String): Map[String, String] = {
+    Map(APP_RESOURCE_TYPE.key -> resType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
index 4c1be3b..58cdaa3 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
@@ -61,11 +61,11 @@ private[spark] trait KubernetesFeatureConfigStep {
   /**
    * Return any system properties that should be set on the JVM in accordance 
to this feature.
    */
-  def getAdditionalPodSystemProperties(): Map[String, String]
+  def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
 
   /**
    * Return any additional Kubernetes resources that should be added to 
support this feature. Only
    * applicable when creating the driver in cluster mode.
    */
-  def getAdditionalKubernetesResources(): Seq[HasMetadata]
+  def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
index 96a8013..28e2d17 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
@@ -54,11 +54,11 @@ private[spark] class PodTemplateConfigMapStep(
     SparkPod(podWithVolume, containerWithVolume)
   }
 
-  def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, 
String](
+  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map[String, String](
     KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
       (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + 
EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
 
-  def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
     require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
     val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
     val podTemplateString = Files.toString(new File(podTemplateFile), 
StandardCharsets.UTF_8)

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
deleted file mode 100644
index 6f063b2..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.bindings
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata}
-
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
-import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH
-import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
-import org.apache.spark.launcher.SparkLauncher
-
-private[spark] class JavaDriverFeatureStep(
-  kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
-  extends KubernetesFeatureConfigStep {
-  override def configurePod(pod: SparkPod): SparkPod = {
-    val withDriverArgs = new ContainerBuilder(pod.container)
-      .addToArgs("driver")
-      .addToArgs("--properties-file", SPARK_CONF_PATH)
-      .addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass)
-      // The user application jar is merged into the spark.jars list and 
managed through that
-      // property, so there is no need to reference it explicitly here.
-      .addToArgs(SparkLauncher.NO_RESOURCE)
-      .addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*)
-      .build()
-    SparkPod(pod.pod, withDriverArgs)
-  }
-  override def getAdditionalPodSystemProperties(): Map[String, String] =
-    Map(APP_RESOURCE_TYPE.key -> "java")
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
deleted file mode 100644
index cf0c03b..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.bindings
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
HasMetadata}
-
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
-import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
-
-private[spark] class PythonDriverFeatureStep(
-  kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
-  extends KubernetesFeatureConfigStep {
-  override def configurePod(pod: SparkPod): SparkPod = {
-    val roleConf = kubernetesConf.roleSpecificConf
-    require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be 
defined")
-    // Delineation is done by " " because that is input into PythonRunner
-    val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
-      pyArgs =>
-        new EnvVarBuilder()
-          .withName(ENV_PYSPARK_ARGS)
-          .withValue(pyArgs.mkString(" "))
-          .build())
-    val maybePythonFiles = kubernetesConf.pyFiles().map(
-      // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH
-      // of the respective PySpark pod
-      pyFiles =>
-        new EnvVarBuilder()
-          .withName(ENV_PYSPARK_FILES)
-          .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(","))
-            .mkString(":"))
-          .build())
-    val envSeq =
-      Seq(new EnvVarBuilder()
-          .withName(ENV_PYSPARK_PRIMARY)
-          
.withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get))
-        .build(),
-          new EnvVarBuilder()
-          .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
-          .withValue(kubernetesConf.pySparkPythonVersion())
-        .build())
-    val pythonEnvs = envSeq ++
-      maybePythonArgs.toSeq ++
-      maybePythonFiles.toSeq
-
-    val withPythonPrimaryContainer = new ContainerBuilder(pod.container)
-        .addAllToEnv(pythonEnvs.asJava)
-        .addToArgs("driver-py")
-        .addToArgs("--properties-file", SPARK_CONF_PATH)
-        .addToArgs("--class", roleConf.mainClass)
-      .build()
-
-    SparkPod(pod.pod, withPythonPrimaryContainer)
-  }
-  override def getAdditionalPodSystemProperties(): Map[String, String] =
-    Map(APP_RESOURCE_TYPE.key -> "python")
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
deleted file mode 100644
index 1a7ef52..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.bindings
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
HasMetadata}
-
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
-import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
-
-private[spark] class RDriverFeatureStep(
-  kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
-  extends KubernetesFeatureConfigStep {
-  override def configurePod(pod: SparkPod): SparkPod = {
-    val roleConf = kubernetesConf.roleSpecificConf
-    require(roleConf.mainAppResource.isDefined, "R Main Resource must be 
defined")
-    // Delineation is done by " " because that is input into RRunner
-    val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
-      rArgs =>
-        new EnvVarBuilder()
-          .withName(ENV_R_ARGS)
-          .withValue(rArgs.mkString(" "))
-          .build())
-    val envSeq =
-      Seq(new EnvVarBuilder()
-            .withName(ENV_R_PRIMARY)
-            
.withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get))
-          .build())
-    val rEnvs = envSeq ++
-      maybeRArgs.toSeq
-
-    val withRPrimaryContainer = new ContainerBuilder(pod.container)
-        .addAllToEnv(rEnvs.asJava)
-        .addToArgs("driver-r")
-        .addToArgs("--properties-file", SPARK_CONF_PATH)
-        .addToArgs("--class", roleConf.mainClass)
-      .build()
-
-    SparkPod(pod.pod, withRPrimaryContainer)
-  }
-  override def getAdditionalPodSystemProperties(): Map[String, String] =
-    Map(APP_RESOURCE_TYPE.key -> "r")
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 4b58f8b..543d6b1 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -44,7 +44,7 @@ import org.apache.spark.util.Utils
  * @param maybePyFiles additional Python files via --py-files
  */
 private[spark] case class ClientArguments(
-    mainAppResource: Option[MainAppResource],
+    mainAppResource: MainAppResource,
     mainClass: String,
     driverArgs: Array[String],
     maybePyFiles: Option[String],
@@ -53,18 +53,18 @@ private[spark] case class ClientArguments(
 private[spark] object ClientArguments {
 
   def fromCommandLineArgs(args: Array[String]): ClientArguments = {
-    var mainAppResource: Option[MainAppResource] = None
+    var mainAppResource: MainAppResource = JavaMainAppResource(None)
     var mainClass: Option[String] = None
     val driverArgs = mutable.ArrayBuffer.empty[String]
     var maybePyFiles : Option[String] = None
 
     args.sliding(2, 2).toList.foreach {
       case Array("--primary-java-resource", primaryJavaResource: String) =>
-        mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+        mainAppResource = JavaMainAppResource(Some(primaryJavaResource))
       case Array("--primary-py-file", primaryPythonResource: String) =>
-        mainAppResource = Some(PythonMainAppResource(primaryPythonResource))
+        mainAppResource = PythonMainAppResource(primaryPythonResource)
       case Array("--primary-r-file", primaryRFile: String) =>
-        mainAppResource = Some(RMainAppResource(primaryRFile))
+        mainAppResource = RMainAppResource(primaryRFile)
       case Array("--other-py-files", pyFiles: String) =>
         maybePyFiles = Some(pyFiles)
       case Array("--main-class", clazz: String) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 5565cd7..be4daec 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.KubernetesClient
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.{Config, KubernetesConf, 
KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, 
KubernetesUtils, SparkPod}
 import org.apache.spark.deploy.k8s.features._
-import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, 
PythonDriverFeatureStep, RDriverFeatureStep}
 
 private[spark] class KubernetesDriverBuilder(
     provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => 
BasicDriverFeatureStep =
@@ -45,18 +44,10 @@ private[spark] class KubernetesDriverBuilder(
     provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
       => MountVolumesFeatureStep) =
       new MountVolumesFeatureStep(_),
-    providePythonStep: (
+    provideDriverCommandStep: (
       KubernetesConf[KubernetesDriverSpecificConf]
-      => PythonDriverFeatureStep) =
-      new PythonDriverFeatureStep(_),
-    provideRStep: (
-      KubernetesConf[KubernetesDriverSpecificConf]
-        => RDriverFeatureStep) =
-    new RDriverFeatureStep(_),
-    provideJavaStep: (
-      KubernetesConf[KubernetesDriverSpecificConf]
-        => JavaDriverFeatureStep) =
-    new JavaDriverFeatureStep(_),
+      => DriverCommandFeatureStep) =
+      new DriverCommandFeatureStep(_),
     provideHadoopGlobalStep: (
       KubernetesConf[KubernetesDriverSpecificConf]
         => KerberosConfDriverFeatureStep) =
@@ -88,21 +79,14 @@ private[spark] class KubernetesDriverBuilder(
       Seq(providePodTemplateConfigMapStep(kubernetesConf))
     } else Nil
 
-    val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
-        case JavaMainAppResource(_) =>
-          provideJavaStep(kubernetesConf)
-        case PythonMainAppResource(_) =>
-          providePythonStep(kubernetesConf)
-        case RMainAppResource(_) =>
-          provideRStep(kubernetesConf)}
-      .getOrElse(provideJavaStep(kubernetesConf))
+    val driverCommandStep = provideDriverCommandStep(kubernetesConf)
 
     val maybeHadoopConfigStep =
       kubernetesConf.hadoopConfSpec.map { _ =>
         provideHadoopGlobalStep(kubernetesConf)}
 
     val allFeatures: Seq[KubernetesFeatureConfigStep] =
-      (baseFeatures :+ bindingsStep) ++
+      baseFeatures ++ Seq(driverCommandStep) ++
         secretFeature ++ envSecretFeature ++ volumesFeature ++
         maybeHadoopConfigStep.toSeq ++ podTemplateFeature
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
index dd5a454..a2e01fa 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
@@ -20,7 +20,8 @@ private[spark] sealed trait MainAppResource
 
 private[spark] sealed trait NonJVMResource
 
-private[spark] case class JavaMainAppResource(primaryResource: String) extends 
MainAppResource
+private[spark] case class JavaMainAppResource(primaryResource: Option[String])
+  extends MainAppResource
 
 private[spark] case class PythonMainAppResource(primaryResource: String)
   extends MainAppResource with NonJVMResource

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index bb2b94f..41ca8d1 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -56,7 +56,7 @@ class KubernetesConfSuite extends SparkFunSuite {
       APP_NAME,
       RESOURCE_NAME_PREFIX,
       APP_ID,
-      mainAppResource = None,
+      mainAppResource = JavaMainAppResource(None),
       MAIN_CLASS,
       APP_ARGS,
       maybePyFiles = None,
@@ -65,109 +65,10 @@ class KubernetesConfSuite extends SparkFunSuite {
     assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
     assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
     assert(conf.roleSpecificConf.appName === APP_NAME)
-    assert(conf.roleSpecificConf.mainAppResource.isEmpty)
     assert(conf.roleSpecificConf.mainClass === MAIN_CLASS)
     assert(conf.roleSpecificConf.appArgs === APP_ARGS)
   }
 
-  test("Creating driver conf with and without the main app jar influences 
spark.jars") {
-    val sparkConf = new SparkConf(false)
-      .setJars(Seq("local:///opt/spark/jar1.jar"))
-    val mainAppJar = Some(JavaMainAppResource("local:///opt/spark/main.jar"))
-    val kubernetesConfWithMainJar = KubernetesConf.createDriverConf(
-      sparkConf,
-      APP_NAME,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      mainAppJar,
-      MAIN_CLASS,
-      APP_ARGS,
-      maybePyFiles = None,
-      hadoopConfDir = None)
-    assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars")
-      .split(",")
-      === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar"))
-    val kubernetesConfWithoutMainJar = KubernetesConf.createDriverConf(
-      sparkConf,
-      APP_NAME,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      mainAppResource = None,
-      MAIN_CLASS,
-      APP_ARGS,
-      maybePyFiles = None,
-      hadoopConfDir = None)
-    assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
-      === Array("local:///opt/spark/jar1.jar"))
-    assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) 
=== 0.1)
-  }
-
-  test("Creating driver conf with a python primary file") {
-    val mainResourceFile = "local:///opt/spark/main.py"
-    val inputPyFiles = Array("local:///opt/spark/example2.py", 
"local:///example3.py")
-    val sparkConf = new SparkConf(false)
-      .setJars(Seq("local:///opt/spark/jar1.jar"))
-      .set("spark.files", "local:///opt/spark/example4.py")
-    val mainAppResource = Some(PythonMainAppResource(mainResourceFile))
-    val kubernetesConfWithMainResource = KubernetesConf.createDriverConf(
-      sparkConf,
-      APP_NAME,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      mainAppResource,
-      MAIN_CLASS,
-      APP_ARGS,
-      Some(inputPyFiles.mkString(",")),
-      hadoopConfDir = None)
-    
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
-      === Array("local:///opt/spark/jar1.jar"))
-    
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 
0.4)
-    assert(kubernetesConfWithMainResource.sparkFiles
-      === Array("local:///opt/spark/example4.py", mainResourceFile) ++ 
inputPyFiles)
-  }
-
-  test("Creating driver conf with a r primary file") {
-    val mainResourceFile = "local:///opt/spark/main.R"
-    val sparkConf = new SparkConf(false)
-      .setJars(Seq("local:///opt/spark/jar1.jar"))
-      .set("spark.files", "local:///opt/spark/example2.R")
-    val mainAppResource = Some(RMainAppResource(mainResourceFile))
-    val kubernetesConfWithMainResource = KubernetesConf.createDriverConf(
-      sparkConf,
-      APP_NAME,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      mainAppResource,
-      MAIN_CLASS,
-      APP_ARGS,
-      maybePyFiles = None,
-      hadoopConfDir = None)
-    
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
-      === Array("local:///opt/spark/jar1.jar"))
-    
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 
0.4)
-    assert(kubernetesConfWithMainResource.sparkFiles
-      === Array("local:///opt/spark/example2.R", mainResourceFile))
-  }
-
-  test("Testing explicit setting of memory overhead on non-JVM tasks") {
-    val sparkConf = new SparkConf(false)
-      .set(MEMORY_OVERHEAD_FACTOR, 0.3)
-
-    val mainResourceFile = "local:///opt/spark/main.py"
-    val mainAppResource = Some(PythonMainAppResource(mainResourceFile))
-    val conf = KubernetesConf.createDriverConf(
-      sparkConf,
-      APP_NAME,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      mainAppResource,
-      MAIN_CLASS,
-      APP_ARGS,
-      maybePyFiles = None,
-      hadoopConfDir = None)
-    assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3)
-  }
-
   test("Resolve driver labels, annotations, secret mount paths, envs, and 
memory overhead") {
     val sparkConf = new SparkConf(false)
       .set(MEMORY_OVERHEAD_FACTOR, 0.3)
@@ -192,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite {
       APP_NAME,
       RESOURCE_NAME_PREFIX,
       APP_ID,
-      mainAppResource = None,
+      mainAppResource = JavaMainAppResource(None),
       MAIN_CLASS,
       APP_ARGS,
       maybePyFiles = None,

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 5c6bcc7..1e7dfbe 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
-import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
+import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.internal.config._
 import org.apache.spark.ui.SparkUI
 
 class BasicDriverFeatureStepSuite extends SparkFunSuite {
@@ -52,7 +52,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       new LocalObjectReferenceBuilder().withName(secret).build()
     }
   private val emptyDriverSpecificConf = KubernetesDriverSpecificConf(
-    None,
+    JavaMainAppResource(None),
     APP_NAME,
     MAIN_CLASS,
     APP_ARGS)
@@ -62,8 +62,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
       .set("spark.driver.cores", "2")
       .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
+      .set(DRIVER_MEMORY.key, "256M")
+      .set(DRIVER_MEMORY_OVERHEAD, 200L)
       .set(CONTAINER_IMAGE, "spark-driver:latest")
       .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
     val kubernetesConf = KubernetesConf(
@@ -77,7 +77,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       DRIVER_ENVS,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
 
     val featureStep = new BasicDriverFeatureStep(kubernetesConf)
@@ -130,21 +129,22 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
       "spark.app.id" -> APP_ID,
       KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
-      "spark.kubernetes.submitInDriver" -> "true")
+      "spark.kubernetes.submitInDriver" -> "true",
+      MEMORY_OVERHEAD_FACTOR.key -> 
MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
     assert(featureStep.getAdditionalPodSystemProperties() === 
expectedSparkConf)
   }
 
   test("Check appropriate entrypoint rerouting for various bindings") {
     val javaSparkConf = new SparkConf()
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g")
+      .set(DRIVER_MEMORY.key, "4g")
       .set(CONTAINER_IMAGE, "spark-driver:latest")
     val pythonSparkConf = new SparkConf()
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g")
+      .set(DRIVER_MEMORY.key, "4g")
       .set(CONTAINER_IMAGE, "spark-driver-py:latest")
     val javaKubernetesConf = KubernetesConf(
       javaSparkConf,
       KubernetesDriverSpecificConf(
-        Some(JavaMainAppResource("")),
+        JavaMainAppResource(None),
         APP_NAME,
         PY_MAIN_CLASS,
         APP_ARGS),
@@ -156,13 +156,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       DRIVER_ENVS,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
 
     val pythonKubernetesConf = KubernetesConf(
       pythonSparkConf,
       KubernetesDriverSpecificConf(
-        Some(PythonMainAppResource("")),
+        PythonMainAppResource(""),
         APP_NAME,
         PY_MAIN_CLASS,
         APP_ARGS),
@@ -174,7 +173,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       DRIVER_ENVS,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
     val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
     val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf)
@@ -204,7 +202,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       DRIVER_ENVS,
       Nil,
-      allFiles,
       hadoopConfSpec = None)
 
     val step = new BasicDriverFeatureStep(kubernetesConf)
@@ -215,10 +212,52 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
       "spark.kubernetes.submitInDriver" -> "true",
       "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
-      "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt";)
+      "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt";,
+      MEMORY_OVERHEAD_FACTOR.key -> 
MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
     assert(additionalProperties === expectedSparkConf)
   }
 
+  // Memory overhead tests. Tuples are:
+  //   test name, main resource, overhead factor, expected factor
+  Seq(
+    ("java", JavaMainAppResource(None), None, 
MEMORY_OVERHEAD_FACTOR.defaultValue.get),
+    ("python default", PythonMainAppResource(null), None, 
NON_JVM_MEMORY_OVERHEAD_FACTOR),
+    ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d),
+    ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR)
+  ).foreach { case (name, resource, factor, expectedFactor) =>
+    test(s"memory overhead factor: $name") {
+      // Choose a driver memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
+      val driverMem = MEMORY_OVERHEAD_MIN_MIB / 
MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+
+      // main app resource, overhead factor
+      val sparkConf = new SparkConf(false)
+        .set(CONTAINER_IMAGE, "spark-driver:latest")
+        .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
+      factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) }
+      val driverConf = emptyDriverSpecificConf.copy(mainAppResource = resource)
+      val conf = KubernetesConf(
+        sparkConf,
+        driverConf,
+        RESOURCE_NAME_PREFIX,
+        APP_ID,
+        DRIVER_LABELS,
+        DRIVER_ANNOTATIONS,
+        Map.empty,
+        Map.empty,
+        DRIVER_ENVS,
+        Nil,
+        hadoopConfSpec = None)
+      val step = new BasicDriverFeatureStep(conf)
+      val pod = step.configurePod(SparkPod.initialPod())
+      val mem = 
pod.container.getResources.getRequests.get("memory").getAmount()
+      val expected = (driverMem + driverMem * expectedFactor).toInt
+      assert(mem === s"${expected}Mi")
+
+      val systemProperties = step.getAdditionalPodSystemProperties()
+      assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === 
expectedFactor.toString)
+    }
+  }
+
   def containerPort(name: String, portNumber: Int): ContainerPort =
     new ContainerPortBuilder()
       .withName(name)

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 41f34bd..e9a16aa 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -91,7 +91,6 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String],
         hadoopConfSpec = None))
     val executor = step.configurePod(SparkPod.initialPod())
 
@@ -132,7 +131,6 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String],
         hadoopConfSpec = None))
     
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length 
=== 63)
   }
@@ -154,7 +152,6 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map("qux" -> "quux"),
         Nil,
-        Seq.empty[String],
         hadoopConfSpec = None))
     val executor = step.configurePod(SparkPod.initialPod())
 
@@ -182,7 +179,6 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String],
         hadoopConfSpec = None))
     val executor = step.configurePod(SparkPod.initialPod())
     // This is checking that basic executor + executorMemory = 1408 + 42 = 1450

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
new file mode 100644
index 0000000..3067295
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.util.Utils
+
+class DriverCommandFeatureStepSuite extends SparkFunSuite {
+
+  private val MAIN_CLASS = "mainClass"
+
+  test("java resource") {
+    val mainResource = "local:///main.jar"
+    val spec = applyFeatureStep(
+      JavaMainAppResource(Some(mainResource)),
+      appArgs = Array("5", "7"))
+    assert(spec.pod.container.getArgs.asScala === List(
+      "driver",
+      "--properties-file", SPARK_CONF_PATH,
+      "--class", MAIN_CLASS,
+      "spark-internal", "5", "7"))
+
+    val jars = Utils.stringToSeq(spec.systemProperties("spark.jars"))
+    assert(jars.toSet === Set(mainResource))
+  }
+
+  test("python resource with no extra files") {
+    val mainResource = "local:///main.py"
+    val sparkConf = new SparkConf(false)
+      .set(PYSPARK_MAJOR_PYTHON_VERSION, "3")
+
+    val spec = applyFeatureStep(
+      PythonMainAppResource(mainResource),
+      conf = sparkConf)
+    assert(spec.pod.container.getArgs.asScala === List(
+      "driver",
+      "--properties-file", SPARK_CONF_PATH,
+      "--class", MAIN_CLASS,
+      "/main.py"))
+    val envs = spec.pod.container.getEnv.asScala
+      .map { env => (env.getName, env.getValue) }
+      .toMap
+    assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3")
+
+    val files = Utils.stringToSeq(spec.systemProperties("spark.files"))
+    assert(files.toSet === Set(mainResource))
+  }
+
+  test("python resource with extra files") {
+    val expectedMainResource = "/main.py"
+    val expectedPySparkFiles = "/example2.py:/example3.py"
+    val filesInConf = Set("local:///example.py")
+
+    val mainResource = s"local://$expectedMainResource"
+    val pyFiles = Seq("local:///example2.py", "local:///example3.py")
+
+    val sparkConf = new SparkConf(false)
+      .set("spark.files", filesInConf.mkString(","))
+      .set(PYSPARK_MAJOR_PYTHON_VERSION, "2")
+    val spec = applyFeatureStep(
+      PythonMainAppResource(mainResource),
+      conf = sparkConf,
+      appArgs = Array("5", "7", "9"),
+      pyFiles = pyFiles)
+
+    assert(spec.pod.container.getArgs.asScala === List(
+      "driver",
+      "--properties-file", SPARK_CONF_PATH,
+      "--class", MAIN_CLASS,
+      "/main.py", "5", "7", "9"))
+
+    val envs = spec.pod.container.getEnv.asScala
+      .map { env => (env.getName, env.getValue) }
+      .toMap
+    val expected = Map(
+      ENV_PYSPARK_FILES -> expectedPySparkFiles,
+      ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2")
+    assert(envs === expected)
+
+    val files = Utils.stringToSeq(spec.systemProperties("spark.files"))
+    assert(files.toSet === pyFiles.toSet ++ filesInConf ++ Set(mainResource))
+  }
+
+  test("R resource") {
+    val expectedMainResource = "/main.R"
+    val mainResource = s"local://$expectedMainResource"
+
+    val spec = applyFeatureStep(
+      RMainAppResource(mainResource),
+      appArgs = Array("5", "7", "9"))
+
+    assert(spec.pod.container.getArgs.asScala === List(
+      "driver",
+      "--properties-file", SPARK_CONF_PATH,
+      "--class", MAIN_CLASS,
+      "/main.R", "5", "7", "9"))
+  }
+
+  private def applyFeatureStep(
+      resource: MainAppResource,
+      conf: SparkConf = new SparkConf(false),
+      appArgs: Array[String] = Array(),
+      pyFiles: Seq[String] = Nil): KubernetesDriverSpec = {
+    val driverConf = new KubernetesDriverSpecificConf(
+      resource, MAIN_CLASS, "appName", appArgs, pyFiles = pyFiles)
+    val kubernetesConf = KubernetesConf(
+      conf,
+      driverConf,
+      "resource-prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Nil,
+      hadoopConfSpec = None)
+    val step = new DriverCommandFeatureStep(kubernetesConf)
+    val pod = step.configurePod(SparkPod.initialPod())
+    val props = step.getAdditionalPodSystemProperties()
+    KubernetesDriverSpec(pod, Nil, props)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index 8675ceb..36c6616 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -62,7 +62,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === 
BASE_DRIVER_POD)
@@ -95,7 +94,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
 
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
@@ -135,7 +133,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     val resolvedProperties = 
kubernetesCredentialsStep.getAdditionalPodSystemProperties()

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 5c3e801..3c46667 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
 import org.apache.spark.util.Clock
 
 class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
@@ -59,7 +60,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       KubernetesConf(
         sparkConf,
         KubernetesDriverSpecificConf(
-          None, "main", "app", Seq.empty),
+          JavaMainAppResource(None), "main", "app", Seq.empty),
         SHORT_RESOURCE_NAME_PREFIX,
         "app-id",
         DRIVER_LABELS,
@@ -68,7 +69,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String],
         hadoopConfSpec = None))
     assert(configurationStep.configurePod(SparkPod.initialPod()) === 
SparkPod.initialPod())
     assert(configurationStep.getAdditionalKubernetesResources().size === 1)
@@ -92,7 +92,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
           .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 
8080)
           .set(KUBERNETES_NAMESPACE, "my-namespace"),
         KubernetesDriverSpecificConf(
-          None, "main", "app", Seq.empty),
+          JavaMainAppResource(None), "main", "app", Seq.empty),
         SHORT_RESOURCE_NAME_PREFIX,
         "app-id",
         DRIVER_LABELS,
@@ -101,7 +101,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String],
         hadoopConfSpec = None))
     val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
       DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
@@ -115,7 +114,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       KubernetesConf(
         sparkConf,
         KubernetesDriverSpecificConf(
-          None, "main", "app", Seq.empty),
+          JavaMainAppResource(None), "main", "app", Seq.empty),
         SHORT_RESOURCE_NAME_PREFIX,
         "app-id",
         DRIVER_LABELS,
@@ -124,7 +123,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String],
         hadoopConfSpec = None))
     val resolvedService = configurationStep
       .getAdditionalKubernetesResources()
@@ -147,7 +145,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       KubernetesConf(
         sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
         KubernetesDriverSpecificConf(
-          None, "main", "app", Seq.empty),
+          JavaMainAppResource(None), "main", "app", Seq.empty),
         LONG_RESOURCE_NAME_PREFIX,
         "app-id",
         DRIVER_LABELS,
@@ -156,7 +154,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String],
         hadoopConfSpec = None),
       clock)
     val driverService = configurationStep
@@ -176,7 +173,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         KubernetesConf(
           sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, 
"host"),
           KubernetesDriverSpecificConf(
-            None, "main", "app", Seq.empty),
+            JavaMainAppResource(None), "main", "app", Seq.empty),
           LONG_RESOURCE_NAME_PREFIX,
           "app-id",
           DRIVER_LABELS,
@@ -185,7 +182,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
           Map.empty,
           Map.empty,
           Nil,
-          Seq.empty[String],
           hadoopConfSpec = None),
         clock)
       fail("The driver bind address should not be allowed.")
@@ -203,7 +199,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         KubernetesConf(
           sparkConf,
           KubernetesDriverSpecificConf(
-            None, "main", "app", Seq.empty),
+            JavaMainAppResource(None), "main", "app", Seq.empty),
           LONG_RESOURCE_NAME_PREFIX,
           "app-id",
           DRIVER_LABELS,
@@ -212,7 +208,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
           Map.empty,
           Map.empty,
           Nil,
-          Seq.empty[String],
           hadoopConfSpec = None),
         clock)
       fail("The driver host address should not be allowed.")

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
index 43796b7..3d25307 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
@@ -46,7 +46,6 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
       envVarsToKeys,
       Map.empty,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
 
     val step = new EnvSecretsFeatureStep(kubernetesConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index 3a4e605..894d824 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
 
 class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   private val defaultLocalDir = "/var/data/default-local-dir"
@@ -36,7 +37,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with 
BeforeAndAfter {
     kubernetesConf = KubernetesConf(
       sparkConf,
       KubernetesDriverSpecificConf(
-        None,
+        JavaMainAppResource(None),
         "app-name",
         "main",
         Seq.empty),
@@ -48,7 +49,6 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with 
BeforeAndAfter {
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
index 18e3d77..1555f6a 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -44,7 +44,6 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String],
       hadoopConfSpec = None)
 
     val step = new MountSecretsFeatureStep(kubernetesConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index 0d0a5fb..2a95746 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -18,13 +18,14 @@ package org.apache.spark.deploy.k8s.features
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
 
 class MountVolumesFeatureStepSuite extends SparkFunSuite {
   private val sparkConf = new SparkConf(false)
   private val emptyKubernetesConf = KubernetesConf(
     sparkConf = sparkConf,
     roleSpecificConf = KubernetesDriverSpecificConf(
-      None,
+      JavaMainAppResource(None),
       "app-name",
       "main",
       Seq.empty),
@@ -36,7 +37,6 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     roleSecretEnvNamesToKeyRefs = Map.empty,
     roleEnvs = Map.empty,
     roleVolumes = Nil,
-    sparkFiles = Nil,
     hadoopConfSpec = None)
 
   test("Mounts hostPath volumes") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
index d7bbbd1..370948c 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
 
 class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter {
   private var sparkConf: SparkConf = _
@@ -36,7 +37,7 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite 
with BeforeAndAfter {
     kubernetesConf = KubernetesConf(
       sparkConf,
       KubernetesDriverSpecificConf(
-        None,
+        JavaMainAppResource(None),
         "app-name",
         "main",
         Seq.empty),
@@ -48,7 +49,6 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String],
       Option.empty)
     templateFile = Files.createTempFile("pod-template", "yml").toFile
     templateFile.deleteOnExit()

http://git-wip-us.apache.org/repos/asf/spark/blob/3404a73f/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
deleted file mode 100644
index 9172e0c..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.bindings
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
-
-class JavaDriverFeatureStepSuite extends SparkFunSuite {
-
-  test("Java Step modifies container correctly") {
-    val baseDriverPod = SparkPod.initialPod()
-    val sparkConf = new SparkConf(false)
-    val kubernetesConf = KubernetesConf(
-      sparkConf,
-      KubernetesDriverSpecificConf(
-        Some(PythonMainAppResource("local:///main.jar")),
-        "test-class",
-        "java-runner",
-        Seq("5 7")),
-      appResourceNamePrefix = "",
-      appId = "",
-      roleLabels = Map.empty,
-      roleAnnotations = Map.empty,
-      roleSecretNamesToMountPaths = Map.empty,
-      roleSecretEnvNamesToKeyRefs = Map.empty,
-      roleEnvs = Map.empty,
-      roleVolumes = Nil,
-      sparkFiles = Seq.empty[String],
-      hadoopConfSpec = None)
-
-    val step = new JavaDriverFeatureStep(kubernetesConf)
-    val driverPod = step.configurePod(baseDriverPod).pod
-    val driverContainerwithJavaStep = 
step.configurePod(baseDriverPod).container
-    assert(driverContainerwithJavaStep.getArgs.size === 7)
-    val args = driverContainerwithJavaStep
-      .getArgs.asScala
-    assert(args === List(
-      "driver",
-      "--properties-file", SPARK_CONF_PATH,
-      "--class", "test-class",
-      "spark-internal", "5 7"))
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to