This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dc792d91c9 [SPARK-35084][CORE] Fixing --packages in k8s client mode 
with the driver running inside a POD
9dc792d91c9 is described below

commit 9dc792d91c95f7ce290fff96e979e5540180f8a2
Author: Keunhyun Oh <ocwo...@gmail.com>
AuthorDate: Sat Jan 14 17:29:55 2023 -0800

    [SPARK-35084][CORE] Fixing --packages in k8s client mode with the driver 
running inside a POD
    
    ### What changes were proposed in this pull request?
    Supporting '--packages' in the k8s cluster mode
    
    ### Why are the changes needed?
    In spark 3, '--packages' in the k8s cluster mode is not supported. I 
expected that managing dependencies by using packages like spark 2.
    
    Spark 2.4.5
    
    
https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
    
    ```scala
         if (!isMesosCluster && !isStandAloneCluster) {
          // Resolve maven dependencies if there are any and add classpath to 
jars. Add them to py-files
          // too for packages that include Python code
          val resolvedMavenCoordinates = 
DependencyUtils.resolveMavenDependencies(
            args.packagesExclusions, args.packages, args.repositories, 
args.ivyRepoPath,
            args.ivySettingsPath)
    
          if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
            args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
            if (args.isPython || isInternal(args.primaryResource)) {
              args.pyFiles = mergeFileLists(args.pyFiles, 
resolvedMavenCoordinates)
            }
          }
    
          // install any R packages that may have been passed through --jars or 
--packages.
          // Spark Packages may contain R source code inside the jar.
          if (args.isR && !StringUtils.isBlank(args.jars)) {
            RPackageUtils.checkAndBuildRPackage(args.jars, printStream, 
args.verbose)
          }
        }
     ```
    
    Spark 3.0.2
    
    
https://github.com/apache/spark/blob/v3.0.2/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
    
    ```scala
           if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
            // In K8s client mode, when in the driver, add resolved jars early 
as we might need
            // them at the submit time for artifact downloading.
            // For example we might use the dependencies for downloading
            // files from a Hadoop Compatible fs eg. S3. In this case the user 
might pass:
            // --packages 
com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
            if (isKubernetesClusterModeDriver) {
              val loader = getSubmitClassLoader(sparkConf)
              for (jar <- resolvedMavenCoordinates.split(",")) {
                addJarToClasspath(jar, loader)
              }
            } else if (isKubernetesCluster) {
              // We need this in K8s cluster mode so that we can upload local 
deps
              // via the k8s application, like in cluster mode driver
              childClasspath ++= resolvedMavenCoordinates.split(",")
            } else {
              args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
              if (args.isPython || isInternal(args.primaryResource)) {
                args.pyFiles = mergeFileLists(args.pyFiles, 
resolvedMavenCoordinates)
              }
            }
          }
    ```
    
    unlike spark2, in spark 3, jars are not added in any place.
    
    ### Does this PR introduce _any_ user-facing change?
    Unlike spark 2, resolved jars are added not in cluster mode spark submit 
but in driver.
    
    It's because in spark 3, the feature is added that is uploading jars with 
prefix "file://" to s3.
    So, if resolved jars are added in spark submit, every jars from packages 
are uploading to s3! When I tested it, it is very bad experience to me.
    
    ### How was this patch tested?
    1) In my k8s environment, i tested the code.
    2) unittest
    
    Closes #38828 from ocworld/SPARK-35084-CORE-k8s-pacakges.
    
    Authored-by: Keunhyun Oh <ocwo...@gmail.com>
    Signed-off-by: attilapiros <piros.attila.zs...@gmail.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 24 ++++++++++--------
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 29 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a701b0ea607..fa19c7918af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -319,21 +319,23 @@ private[spark] class SparkSubmit extends Logging {
         args.repositories, args.ivyRepoPath, args.ivySettingsPath)
 
       if (resolvedMavenCoordinates.nonEmpty) {
-        // In K8s client mode, when in the driver, add resolved jars early as 
we might need
-        // them at the submit time for artifact downloading.
-        // For example we might use the dependencies for downloading
-        // files from a Hadoop Compatible fs e.g. S3. In this case the user 
might pass:
-        // --packages 
com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
-        if (isKubernetesClusterModeDriver) {
-          val loader = getSubmitClassLoader(sparkConf)
-          for (jar <- resolvedMavenCoordinates) {
-            addJarToClasspath(jar, loader)
-          }
-        } else if (isKubernetesCluster) {
+        if (isKubernetesCluster) {
           // We need this in K8s cluster mode so that we can upload local deps
           // via the k8s application, like in cluster mode driver
           childClasspath ++= resolvedMavenCoordinates
         } else {
+          // In K8s client mode, when in the driver, add resolved jars early 
as we might need
+          // them at the submit time for artifact downloading.
+          // For example we might use the dependencies for downloading
+          // files from a Hadoop Compatible fs e.g. S3. In this case the user 
might pass:
+          // --packages 
com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
+          if (isKubernetesClusterModeDriver) {
+            val loader = getSubmitClassLoader(sparkConf)
+            for (jar <- resolvedMavenCoordinates) {
+              addJarToClasspath(jar, loader)
+            }
+          }
+
           args.jars = mergeFileLists(args.jars, 
mergeFileLists(resolvedMavenCoordinates: _*))
           if (args.isPython || isInternal(args.primaryResource)) {
             args.pyFiles = mergeFileLists(args.pyFiles,
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 76311d0ab18..1afcb3bcd99 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -486,6 +486,35 @@ class SparkSubmitSuite
     conf.get("spark.kubernetes.driver.container.image") should be ("bar")
   }
 
+  test("SPARK-35084: include jars of the --packages" +
+    " in k8s client mode & driver runs inside a POD") {
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
+    val dep = MavenCoordinate("my.great.dep", "mylib", "0.1")
+    IvyTestUtils.withRepository(main, Some(dep.toString), None) { repo =>
+      val clArgs = Seq(
+        "--class", JarCreationTest.getClass.getName.stripSuffix("$"),
+        "--name", "testApp",
+        "--deploy-mode", "client",
+        "--proxy-user", "test.user",
+        "--class", "org.SomeClass",
+        "--master", "k8s://host:port",
+        "--packages", Seq(main, dep).mkString(","),
+        "--repositories", repo,
+        "--conf", "spark.ui.enabled=false",
+        "--conf", "spark.master.rest.enabled=false",
+        "--conf", "spark.kubernetes.namespace=spark",
+        "--conf", "spark.kubernetes.submitInDriver=true",
+        "--conf", 
s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath()}",
+        unusedJar.toString,
+        "my.great.lib.MyLib", "my.great.dep.MyLib")
+
+      val appArgs = new SparkSubmitArguments(clArgs)
+      val (_, _, sparkConf, _) = submit.prepareSubmitEnvironment(appArgs)
+      sparkConf.get("spark.jars").contains("mylib") shouldBe true
+    }
+  }
+
   test("SPARK-33782: handles k8s files download to current directory") {
     val clArgs = Seq(
       "--deploy-mode", "client",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to