This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a6ddc9d  [SPARK-24736][K8S] Let spark-submit handle dependency 
resolution.
a6ddc9d is described below

commit a6ddc9d08352540e492c8c7aa3b602c3e5902538
Author: Marcelo Vanzin <van...@cloudera.com>
AuthorDate: Wed Feb 27 09:49:31 2019 -0800

    [SPARK-24736][K8S] Let spark-submit handle dependency resolution.
    
    Before this change, there was some code in the k8s backend to deal
    with how to resolve dependencies and make them available to the
    Spark application. It turns out that none of that code is necessary,
    since spark-submit already handles all that for applications started
    in client mode - like the k8s driver that is run inside a Spark-created
    pod.
    
    For that reason, specifically for pyspark, there's no need for the
    k8s backend to deal with PYTHONPATH; or, in general, to change the URIs
    provided by the user at all. spark-submit takes care of that.
    
    For testing, I created a pyspark script that depends on another module
    that is shipped with --py-files. Then I used:
    
    - --py-files http://.../dep.py http://.../test.py
    - --py-files http://.../dep.zip http://.../test.py
    - --py-files local:/.../dep.py local:/.../test.py
    - --py-files local:/.../dep.zip local:/.../test.py
    
    Without this change, all of the above commands fail. With the change, the
    driver is able to see the dependencies in all the above cases; but executors
    don't see the dependencies in the last two. That's a bug in shared Spark 
code
    that deals with local: dependencies in pyspark (SPARK-26934).
    
    I also tested a Scala app using the main jar from an http server.
    
    Closes #23793 from vanzin/SPARK-24736.
    
    Authored-by: Marcelo Vanzin <van...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      |  6 +-
 .../org/apache/spark/deploy/k8s/Constants.scala    |  1 -
 .../apache/spark/deploy/k8s/KubernetesConf.scala   | 10 +---
 .../apache/spark/deploy/k8s/KubernetesUtils.scala  | 20 -------
 .../k8s/features/BasicDriverFeatureStep.scala      |  9 ---
 .../k8s/features/DriverCommandFeatureStep.scala    | 67 ++++++----------------
 .../k8s/submit/KubernetesClientApplication.scala   | 13 +----
 .../spark/deploy/k8s/KubernetesConfSuite.scala     |  3 +-
 .../spark/deploy/k8s/KubernetesTestConf.scala      |  3 +-
 .../k8s/features/BasicDriverFeatureStepSuite.scala | 23 --------
 .../features/DriverCommandFeatureStepSuite.scala   | 64 ++++-----------------
 .../dockerfiles/spark/bindings/python/Dockerfile   |  1 -
 .../src/main/dockerfiles/spark/entrypoint.sh       |  4 --
 13 files changed, 38 insertions(+), 186 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index f4d9fe0..2843bd5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -520,6 +520,7 @@ private[spark] class SparkSubmit extends Logging {
         confKey = PRINCIPAL.key),
       OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
         confKey = KEYTAB.key),
+      OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = 
SUBMIT_PYTHON_FILES.key),
 
       // Propagate attributes for dependency resolution at the driver side
       OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = 
"spark.jars.packages"),
@@ -694,9 +695,6 @@ private[spark] class SparkSubmit extends Logging {
         if (args.isPython) {
           childArgs ++= Array("--primary-py-file", args.primaryResource)
           childArgs ++= Array("--main-class", 
"org.apache.spark.deploy.PythonRunner")
-          if (args.pyFiles != null) {
-            childArgs ++= Array("--other-py-files", args.pyFiles)
-          }
         } else if (args.isR) {
           childArgs ++= Array("--primary-r-file", args.primaryResource)
           childArgs ++= Array("--main-class", 
"org.apache.spark.deploy.RRunner")
@@ -744,7 +742,7 @@ private[spark] class SparkSubmit extends Logging {
     // explicitly sets `spark.submit.pyFiles` in his/her default properties 
file.
     val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
     val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
-    val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
+    val formattedPyFiles = if (deployMode != CLUSTER) {
       PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
     } else {
       // Ignoring formatting python path in yarn and mesos cluster mode, these 
two modes
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 76041e7..a3c74ff 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -69,7 +69,6 @@ private[spark] object Constants {
   val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
 
   // BINDINGS
-  val ENV_PYSPARK_FILES = "PYSPARK_FILES"
   val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
 
   // Pod spec templates
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 6febad9..4a63ea9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -73,8 +73,7 @@ private[spark] class KubernetesDriverConf(
     val appId: String,
     val mainAppResource: MainAppResource,
     val mainClass: String,
-    val appArgs: Array[String],
-    val pyFiles: Seq[String])
+    val appArgs: Array[String])
   extends KubernetesConf(sparkConf) {
 
   override val resourceNamePrefix: String = {
@@ -175,14 +174,11 @@ private[spark] object KubernetesConf {
       appId: String,
       mainAppResource: MainAppResource,
       mainClass: String,
-      appArgs: Array[String],
-      maybePyFiles: Option[String]): KubernetesDriverConf = {
+      appArgs: Array[String]): KubernetesDriverConf = {
     // Parse executor volumes in order to verify configuration before the 
driver pod is created.
     KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
 
-    val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
-    new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, 
mainClass, appArgs,
-      pyFiles)
+    new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, 
mainClass, appArgs)
   }
 
   def createExecutorConf(
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 6fafac3..b3f58b0 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -66,26 +66,6 @@ private[spark] object KubernetesUtils extends Logging {
     opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
   }
 
-  /**
-   * For the given collection of file URIs, resolves them as follows:
-   * - File URIs with scheme local:// resolve to just the path of the URI.
-   * - Otherwise, the URIs are returned as-is.
-   */
-  def resolveFileUrisAndPath(fileUris: Iterable[String]): Iterable[String] = {
-    fileUris.map { uri =>
-      resolveFileUri(uri)
-    }
-  }
-
-  def resolveFileUri(uri: String): String = {
-    val fileUri = Utils.resolveURI(uri)
-    val fileScheme = Option(fileUri.getScheme).getOrElse("file")
-    fileScheme match {
-      case "local" => fileUri.getPath
-      case _ => uri
-    }
-  }
-
   def loadPodFromTemplate(
       kubernetesClient: KubernetesClient,
       templateFile: File,
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index e664b64..17c00eb 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -153,15 +153,6 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
       KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
       MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
-
-    Seq(JARS, FILES).foreach { key =>
-      val value = conf.get(key)
-      val resolved = KubernetesUtils.resolveFileUrisAndPath(value)
-      if (resolved.nonEmpty) {
-        additionalProps.put(key.key, resolved.mkString(","))
-      }
-    }
-
     additionalProps.toMap
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
index bd3f8a1..9c9cd1e 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -37,8 +37,8 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesDriverConf)
 
   override def configurePod(pod: SparkPod): SparkPod = {
     conf.mainAppResource match {
-      case JavaMainAppResource(_) =>
-        configureForJava(pod)
+      case JavaMainAppResource(res) =>
+        configureForJava(pod, res.getOrElse(SparkLauncher.NO_RESOURCE))
 
       case PythonMainAppResource(res) =>
         configureForPython(pod, res)
@@ -49,45 +49,33 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesDriverConf)
   }
 
   override def getAdditionalPodSystemProperties(): Map[String, String] = {
-    conf.mainAppResource match {
-      case JavaMainAppResource(res) =>
-        res.map(additionalJavaProperties).getOrElse(Map.empty)
+    val appType = conf.mainAppResource match {
+      case JavaMainAppResource(_) =>
+        APP_RESOURCE_TYPE_JAVA
 
-      case PythonMainAppResource(res) =>
-        additionalPythonProperties(res)
+      case PythonMainAppResource(_) =>
+        APP_RESOURCE_TYPE_PYTHON
 
-      case RMainAppResource(res) =>
-        additionalRProperties(res)
+      case RMainAppResource(_) =>
+        APP_RESOURCE_TYPE_R
     }
+
+    Map(APP_RESOURCE_TYPE.key -> appType)
   }
 
-  private def configureForJava(pod: SparkPod): SparkPod = {
-    // The user application jar is merged into the spark.jars list and managed 
through that
-    // property, so use a "blank" resource for the Java driver.
-    val driverContainer = baseDriverContainer(pod, 
SparkLauncher.NO_RESOURCE).build()
+  private def configureForJava(pod: SparkPod, res: String): SparkPod = {
+    val driverContainer = baseDriverContainer(pod, res).build()
     SparkPod(pod.pod, driverContainer)
   }
 
   private def configureForPython(pod: SparkPod, res: String): SparkPod = {
-    val maybePythonFiles = if (conf.pyFiles.nonEmpty) {
-      // Delineation by ":" is to append the PySpark Files to the PYTHONPATH
-      // of the respective PySpark pod
-      val resolved = KubernetesUtils.resolveFileUrisAndPath(conf.pyFiles)
-      Some(new EnvVarBuilder()
-        .withName(ENV_PYSPARK_FILES)
-        .withValue(resolved.mkString(":"))
-        .build())
-    } else {
-      None
-    }
     val pythonEnvs =
       Seq(new EnvVarBuilder()
           .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
           .withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
-        .build()) ++
-      maybePythonFiles
+        .build())
 
-    val pythonContainer = baseDriverContainer(pod, 
KubernetesUtils.resolveFileUri(res))
+    val pythonContainer = baseDriverContainer(pod, res)
       .addAllToEnv(pythonEnvs.asJava)
       .build()
 
@@ -95,7 +83,7 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesDriverConf)
   }
 
   private def configureForR(pod: SparkPod, res: String): SparkPod = {
-    val rContainer = baseDriverContainer(pod, 
KubernetesUtils.resolveFileUri(res)).build()
+    val rContainer = baseDriverContainer(pod, res).build()
     SparkPod(pod.pod, rContainer)
   }
 
@@ -107,27 +95,4 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesDriverConf)
       .addToArgs(resource)
       .addToArgs(conf.appArgs: _*)
   }
-
-  private def additionalJavaProperties(resource: String): Map[String, String] 
= {
-    resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList(JARS, Seq(resource))
-  }
-
-  private def additionalPythonProperties(resource: String): Map[String, 
String] = {
-    resourceType(APP_RESOURCE_TYPE_PYTHON) ++
-      mergeFileList(FILES, Seq(resource) ++ conf.pyFiles)
-  }
-
-  private def additionalRProperties(resource: String): Map[String, String] = {
-    resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList(FILES, Seq(resource))
-  }
-
-  private def mergeFileList(key: ConfigEntry[Seq[String]], filesToAdd: 
Seq[String])
-    : Map[String, String] = {
-    val existing = conf.get(key)
-    Map(key.key -> (existing ++ filesToAdd).distinct.mkString(","))
-  }
-
-  private def resourceType(resType: String): Map[String, String] = {
-    Map(APP_RESOURCE_TYPE.key -> resType)
-  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 3888778..042012e 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -39,13 +39,11 @@ import org.apache.spark.util.Utils
  * @param mainAppResource the main application resource if any
  * @param mainClass the main class of the application to run
  * @param driverArgs arguments to the driver
- * @param maybePyFiles additional Python files via --py-files
  */
 private[spark] case class ClientArguments(
     mainAppResource: MainAppResource,
     mainClass: String,
-    driverArgs: Array[String],
-    maybePyFiles: Option[String])
+    driverArgs: Array[String])
 
 private[spark] object ClientArguments {
 
@@ -53,7 +51,6 @@ private[spark] object ClientArguments {
     var mainAppResource: MainAppResource = JavaMainAppResource(None)
     var mainClass: Option[String] = None
     val driverArgs = mutable.ArrayBuffer.empty[String]
-    var maybePyFiles : Option[String] = None
 
     args.sliding(2, 2).toList.foreach {
       case Array("--primary-java-resource", primaryJavaResource: String) =>
@@ -62,8 +59,6 @@ private[spark] object ClientArguments {
         mainAppResource = PythonMainAppResource(primaryPythonResource)
       case Array("--primary-r-file", primaryRFile: String) =>
         mainAppResource = RMainAppResource(primaryRFile)
-      case Array("--other-py-files", pyFiles: String) =>
-        maybePyFiles = Some(pyFiles)
       case Array("--main-class", clazz: String) =>
         mainClass = Some(clazz)
       case Array("--arg", arg: String) =>
@@ -78,8 +73,7 @@ private[spark] object ClientArguments {
     ClientArguments(
       mainAppResource,
       mainClass.get,
-      driverArgs.toArray,
-      maybePyFiles)
+      driverArgs.toArray)
   }
 }
 
@@ -214,8 +208,7 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
       kubernetesAppId,
       clientArguments.mainAppResource,
       clientArguments.mainClass,
-      clientArguments.driverArgs,
-      clientArguments.maybePyFiles)
+      clientArguments.driverArgs)
     // The master URL has been checked for validity already in SparkSubmit.
     // We just need to get rid of the "k8s://" prefix here.
     val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index f4d40b0..d51b1e6 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -69,8 +69,7 @@ class KubernetesConfSuite extends SparkFunSuite {
       KubernetesTestConf.APP_ID,
       JavaMainAppResource(None),
       KubernetesTestConf.MAIN_CLASS,
-      APP_ARGS,
-      None)
+      APP_ARGS)
     assert(conf.labels === Map(
       SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
       SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
index 1d77a6d..ee830a9 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
@@ -44,7 +44,6 @@ object KubernetesTestConf {
       mainAppResource: MainAppResource = JavaMainAppResource(None),
       mainClass: String = MAIN_CLASS,
       appArgs: Array[String] = Array.empty,
-      pyFiles: Seq[String] = Nil,
       resourceNamePrefix: Option[String] = None,
       labels: Map[String, String] = Map.empty,
       environment: Map[String, String] = Map.empty,
@@ -64,7 +63,7 @@ object KubernetesTestConf {
     setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, 
secretEnvNamesToKeyRefs)
     setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes)
 
-    new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, 
pyFiles)
+    new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs)
   }
   // scalastyle:on argcount
 
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index ccf88cc..7cfc4d2 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -137,29 +137,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     assert(configuredPythonPod.container.getImage === "spark-driver-py:latest")
   }
 
-  test("Additional system properties resolve jars and set cluster-mode 
confs.") {
-    val allJars = Seq("local:///opt/spark/jar1.jar", 
"hdfs:///opt/spark/jar2.jar")
-    val allFiles = Seq("https://localhost:9000/file1.txt";, 
"local:///opt/spark/file2.txt")
-    val sparkConf = new SparkConf()
-      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
-      .setJars(allJars)
-      .set(FILES, allFiles)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf)
-
-    val step = new BasicDriverFeatureStep(kubernetesConf)
-    val additionalProperties = step.getAdditionalPodSystemProperties()
-    val expectedSparkConf = Map(
-      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
-      "spark.app.id" -> KubernetesTestConf.APP_ID,
-      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> 
kubernetesConf.resourceNamePrefix,
-      "spark.kubernetes.submitInDriver" -> "true",
-      JARS.key -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
-      FILES.key -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt";,
-      MEMORY_OVERHEAD_FACTOR.key -> 
MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
-    assert(additionalProperties === expectedSparkConf)
-  }
-
   // Memory overhead tests. Tuples are:
   //   test name, main resource, overhead factor, expected factor
   Seq(
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
index f74ac92..de80c56 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
@@ -23,12 +23,13 @@ import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 class DriverCommandFeatureStepSuite extends SparkFunSuite {
 
   test("java resource") {
-    val mainResource = "local:///main.jar"
+    val mainResource = "local:/main.jar"
     val spec = applyFeatureStep(
       JavaMainAppResource(Some(mainResource)),
       appArgs = Array("5", "7"))
@@ -36,72 +37,33 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
       "driver",
       "--properties-file", SPARK_CONF_PATH,
       "--class", KubernetesTestConf.MAIN_CLASS,
-      "spark-internal", "5", "7"))
-
-    val jars = Utils.stringToSeq(spec.systemProperties("spark.jars"))
-    assert(jars.toSet === Set(mainResource))
+      mainResource, "5", "7"))
   }
 
-  test("python resource with no extra files") {
-    val mainResource = "local:///main.py"
+  test("python resource") {
+    val mainResource = "local:/main.py"
     val sparkConf = new SparkConf(false)
-      .set(PYSPARK_MAJOR_PYTHON_VERSION, "3")
-
-    val spec = applyFeatureStep(
-      PythonMainAppResource(mainResource),
-      conf = sparkConf)
-    assert(spec.pod.container.getArgs.asScala === List(
-      "driver",
-      "--properties-file", SPARK_CONF_PATH,
-      "--class", KubernetesTestConf.MAIN_CLASS,
-      "/main.py"))
-    val envs = spec.pod.container.getEnv.asScala
-      .map { env => (env.getName, env.getValue) }
-      .toMap
-    assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3")
-
-    val files = Utils.stringToSeq(spec.systemProperties("spark.files"))
-    assert(files.toSet === Set(mainResource))
-  }
-
-  test("python resource with extra files") {
-    val expectedMainResource = "/main.py"
-    val expectedPySparkFiles = "/example2.py:/example3.py"
-    val filesInConf = Set("local:///example.py")
-
-    val mainResource = s"local://$expectedMainResource"
-    val pyFiles = Seq("local:///example2.py", "local:///example3.py")
-
-    val sparkConf = new SparkConf(false)
-      .set("spark.files", filesInConf.mkString(","))
       .set(PYSPARK_MAJOR_PYTHON_VERSION, "2")
     val spec = applyFeatureStep(
       PythonMainAppResource(mainResource),
       conf = sparkConf,
-      appArgs = Array("5", "7", "9"),
-      pyFiles = pyFiles)
+      appArgs = Array("5", "7", "9"))
 
     assert(spec.pod.container.getArgs.asScala === List(
       "driver",
       "--properties-file", SPARK_CONF_PATH,
       "--class", KubernetesTestConf.MAIN_CLASS,
-      "/main.py", "5", "7", "9"))
+      mainResource, "5", "7", "9"))
 
     val envs = spec.pod.container.getEnv.asScala
       .map { env => (env.getName, env.getValue) }
       .toMap
-    val expected = Map(
-      ENV_PYSPARK_FILES -> expectedPySparkFiles,
-      ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2")
+    val expected = Map(ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2")
     assert(envs === expected)
-
-    val files = Utils.stringToSeq(spec.systemProperties("spark.files"))
-    assert(files.toSet === pyFiles.toSet ++ filesInConf ++ Set(mainResource))
   }
 
   test("R resource") {
-    val expectedMainResource = "/main.R"
-    val mainResource = s"local://$expectedMainResource"
+    val mainResource = "local:/main.R"
 
     val spec = applyFeatureStep(
       RMainAppResource(mainResource),
@@ -111,19 +73,17 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
       "driver",
       "--properties-file", SPARK_CONF_PATH,
       "--class", KubernetesTestConf.MAIN_CLASS,
-      "/main.R", "5", "7", "9"))
+      mainResource, "5", "7", "9"))
   }
 
   private def applyFeatureStep(
       resource: MainAppResource,
       conf: SparkConf = new SparkConf(false),
-      appArgs: Array[String] = Array(),
-      pyFiles: Seq[String] = Nil): KubernetesDriverSpec = {
+      appArgs: Array[String] = Array()): KubernetesDriverSpec = {
     val kubernetesConf = KubernetesTestConf.createDriverConf(
       sparkConf = conf,
       mainAppResource = resource,
-      appArgs = appArgs,
-      pyFiles = pyFiles)
+      appArgs = appArgs)
     val step = new DriverCommandFeatureStep(kubernetesConf)
     val pod = step.configurePod(SparkPod.initialPod())
     val props = step.getAdditionalPodSystemProperties()
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
index 5044900..8237c92 100644
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
@@ -39,7 +39,6 @@ RUN apk add --no-cache python && \
 
 COPY python/pyspark ${SPARK_HOME}/python/pyspark
 COPY python/lib ${SPARK_HOME}/python/lib
-ENV PYTHONPATH 
${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip
 
 WORKDIR /opt/spark/work-dir
 ENTRYPOINT [ "/opt/entrypoint.sh" ]
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 613febc..2097fb8 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -44,10 +44,6 @@ if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
   SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
 fi
 
-if [ -n "$PYSPARK_FILES" ]; then
-    PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES"
-fi
-
 if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then
     pyv="$(python -V 2>&1)"
     export PYTHON_VERSION="${pyv:7}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to