This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9dc792d91c9 [SPARK-35084][CORE] Fixing --packages in k8s client mode with the driver running inside a POD 9dc792d91c9 is described below commit 9dc792d91c95f7ce290fff96e979e5540180f8a2 Author: Keunhyun Oh <ocwo...@gmail.com> AuthorDate: Sat Jan 14 17:29:55 2023 -0800 [SPARK-35084][CORE] Fixing --packages in k8s client mode with the driver running inside a POD ### What changes were proposed in this pull request? Supporting '--packages' in the k8s cluster mode ### Why are the changes needed? In spark 3, '--packages' in the k8s cluster mode is not supported. I expected that managing dependencies by using packages like spark 2. Spark 2.4.5 https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ```scala if (!isMesosCluster && !isStandAloneCluster) { // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, args.ivySettingsPath) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython || isInternal(args.primaryResource)) { args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) } } // install any R packages that may have been passed through --jars or --packages. // Spark Packages may contain R source code inside the jar. if (args.isR && !StringUtils.isBlank(args.jars)) { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } } ``` Spark 3.0.2 https://github.com/apache/spark/blob/v3.0.2/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ```scala if (!StringUtils.isBlank(resolvedMavenCoordinates)) { // In K8s client mode, when in the driver, add resolved jars early as we might need // them at the submit time for artifact downloading. // For example we might use the dependencies for downloading // files from a Hadoop Compatible fs eg. S3. In this case the user might pass: // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 if (isKubernetesClusterModeDriver) { val loader = getSubmitClassLoader(sparkConf) for (jar <- resolvedMavenCoordinates.split(",")) { addJarToClasspath(jar, loader) } } else if (isKubernetesCluster) { // We need this in K8s cluster mode so that we can upload local deps // via the k8s application, like in cluster mode driver childClasspath ++= resolvedMavenCoordinates.split(",") } else { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython || isInternal(args.primaryResource)) { args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) } } } ``` unlike spark2, in spark 3, jars are not added in any place. ### Does this PR introduce _any_ user-facing change? Unlike spark 2, resolved jars are added not in cluster mode spark submit but in driver. It's because in spark 3, the feature is added that is uploading jars with prefix "file://" to s3. So, if resolved jars are added in spark submit, every jars from packages are uploading to s3! When I tested it, it is very bad experience to me. ### How was this patch tested? 1) In my k8s environment, i tested the code. 2) unittest Closes #38828 from ocworld/SPARK-35084-CORE-k8s-pacakges. Authored-by: Keunhyun Oh <ocwo...@gmail.com> Signed-off-by: attilapiros <piros.attila.zs...@gmail.com> --- .../org/apache/spark/deploy/SparkSubmit.scala | 24 ++++++++++-------- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 29 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index a701b0ea607..fa19c7918af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -319,21 +319,23 @@ private[spark] class SparkSubmit extends Logging { args.repositories, args.ivyRepoPath, args.ivySettingsPath) if (resolvedMavenCoordinates.nonEmpty) { - // In K8s client mode, when in the driver, add resolved jars early as we might need - // them at the submit time for artifact downloading. - // For example we might use the dependencies for downloading - // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass: - // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 - if (isKubernetesClusterModeDriver) { - val loader = getSubmitClassLoader(sparkConf) - for (jar <- resolvedMavenCoordinates) { - addJarToClasspath(jar, loader) - } - } else if (isKubernetesCluster) { + if (isKubernetesCluster) { // We need this in K8s cluster mode so that we can upload local deps // via the k8s application, like in cluster mode driver childClasspath ++= resolvedMavenCoordinates } else { + // In K8s client mode, when in the driver, add resolved jars early as we might need + // them at the submit time for artifact downloading. + // For example we might use the dependencies for downloading + // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass: + // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 + if (isKubernetesClusterModeDriver) { + val loader = getSubmitClassLoader(sparkConf) + for (jar <- resolvedMavenCoordinates) { + addJarToClasspath(jar, loader) + } + } + args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*)) if (args.isPython || isInternal(args.primaryResource)) { args.pyFiles = mergeFileLists(args.pyFiles, diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 76311d0ab18..1afcb3bcd99 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -486,6 +486,35 @@ class SparkSubmitSuite conf.get("spark.kubernetes.driver.container.image") should be ("bar") } + test("SPARK-35084: include jars of the --packages" + + " in k8s client mode & driver runs inside a POD") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val main = MavenCoordinate("my.great.lib", "mylib", "0.1") + val dep = MavenCoordinate("my.great.dep", "mylib", "0.1") + IvyTestUtils.withRepository(main, Some(dep.toString), None) { repo => + val clArgs = Seq( + "--class", JarCreationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--deploy-mode", "client", + "--proxy-user", "test.user", + "--class", "org.SomeClass", + "--master", "k8s://host:port", + "--packages", Seq(main, dep).mkString(","), + "--repositories", repo, + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", "spark.kubernetes.namespace=spark", + "--conf", "spark.kubernetes.submitInDriver=true", + "--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath()}", + unusedJar.toString, + "my.great.lib.MyLib", "my.great.dep.MyLib") + + val appArgs = new SparkSubmitArguments(clArgs) + val (_, _, sparkConf, _) = submit.prepareSubmitEnvironment(appArgs) + sparkConf.get("spark.jars").contains("mylib") shouldBe true + } + } + test("SPARK-33782: handles k8s files download to current directory") { val clArgs = Seq( "--deploy-mode", "client", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org