This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 83f753e [SPARK-34472][YARN] Ship ivySettings file to driver in cluster mode 83f753e is described below commit 83f753e4e1412a896243f4016600552c0110c1b0 Author: Shardul Mahadik <smaha...@linkedin.com> AuthorDate: Tue Apr 20 13:35:57 2021 -0500 [SPARK-34472][YARN] Ship ivySettings file to driver in cluster mode ### What changes were proposed in this pull request? In YARN, ship the `spark.jars.ivySettings` file to the driver when using `cluster` deploy mode so that `addJar` is able to find it in order to resolve ivy paths. ### Why are the changes needed? SPARK-33084 introduced support for Ivy paths in `sc.addJar` or Spark SQL `ADD JAR`. If we use a custom ivySettings file using `spark.jars.ivySettings`, it is loaded at https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280. However, this file is only accessible on the client machine. In YARN cluster mode, this file is not available on the driver and so `addJar` fails to find it. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests to verify that the `ivySettings` file is localized by the YARN client and that a YARN cluster mode application is able to find to load the `ivySettings` file. Closes #31591 from shardulm94/SPARK-34472. Authored-by: Shardul Mahadik <smaha...@linkedin.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../org/apache/spark/deploy/SparkSubmit.scala | 7 +- docs/configuration.md | 7 +- .../org/apache/spark/deploy/yarn/Client.scala | 57 +++++++++--- .../spark/deploy/yarn/YarnClusterSuite.scala | 103 +++++++++++++++++++++ 4 files changed, 161 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0e31fcf..36873c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1286,7 +1286,12 @@ private[spark] object SparkSubmitUtils extends Logging { settingsFile: String, remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = { - val file = new File(settingsFile) + val uri = new URI(settingsFile) + val file = Option(uri.getScheme).getOrElse("file") match { + case "file" => new File(uri.getPath) + case scheme => throw new IllegalArgumentException(s"Scheme $scheme not supported in " + + "spark.jars.ivySettings") + } require(file.exists(), s"Ivy settings file $file does not exist") require(file.isFile(), s"Ivy settings file $file is not a normal file") val ivySettings: IvySettings = new IvySettings diff --git a/docs/configuration.md b/docs/configuration.md index d9bbddc..c6b462a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -771,7 +771,12 @@ Apart from these, the following properties are also available, and may be useful option <code>--repositories</code> or <code>spark.jars.repositories</code> will also be included. Useful for allowing Spark to resolve artifacts from behind a firewall e.g. via an in-house artifact server like Artifactory. Details on the settings file format can be - found at <a href="http://ant.apache.org/ivy/history/latest-milestone/settings.html">Settings Files</a> + found at <a href="http://ant.apache.org/ivy/history/latest-milestone/settings.html">Settings Files</a>. + Only paths with <code>file://</code> scheme are supported. Paths without a scheme are assumed to have + a <code>file://</code> scheme. + <p/> + When running in YARN cluster mode, this file will also be localized to the remote driver for dependency + resolution within <code>SparkContext#addJar</code> </td> <td>2.2.0</td> </tr> diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 618faef..427202f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,6 +21,7 @@ import java.io.{FileSystem => _, _} import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.nio.file.Files import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -30,7 +31,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.control.NonFatal import com.google.common.base.Objects -import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission @@ -518,6 +518,32 @@ private[spark] class Client( require(localizedPath != null, "Keytab file already distributed.") } + // If we passed in a ivySettings file, make sure we copy the file to the distributed cache + // in cluster mode so that the driver can access it + val ivySettings = sparkConf.getOption("spark.jars.ivySettings") + val ivySettingsLocalizedPath: Option[String] = ivySettings match { + case Some(ivySettingsPath) if isClusterMode => + val uri = new URI(ivySettingsPath) + Option(uri.getScheme).getOrElse("file") match { + case "file" => + val ivySettingsFile = new File(uri.getPath) + require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found") + require(ivySettingsFile.isFile(), s"Ivy settings file $ivySettingsFile is not a" + + "normal file") + // Generate a file name that can be used for the ivySettings file, that does not + // conflict with any user file. + val localizedFileName = Some(ivySettingsFile.getName() + "-" + + UUID.randomUUID().toString) + val (_, localizedPath) = distribute(ivySettingsPath, destName = localizedFileName) + require(localizedPath != null, "IvySettings file already distributed.") + Some(localizedPath) + case scheme => + throw new IllegalArgumentException(s"Scheme $scheme not supported in " + + "spark.jars.ivySettings") + } + case _ => None + } + /** * Add Spark to the cache. There are two settings that control what files to add to the cache: * - if a Spark archive is defined, use the archive. The archive is expected to contain @@ -576,7 +602,7 @@ private[spark] class Client( jarsDir.listFiles().foreach { f => if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) { jarsStream.putNextEntry(new ZipEntry(f.getName)) - Files.copy(f, jarsStream) + Files.copy(f.toPath, jarsStream) jarsStream.closeEntry() } } @@ -672,7 +698,18 @@ private[spark] class Client( val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf) cachedResourcesConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString()) - val localConfArchive = new Path(createConfArchive().toURI()) + val confsToOverride = Map.empty[String, String] + // If propagating the keytab to the AM, override the keytab name with the name of the + // distributed file. + amKeytabFileName.foreach { kt => confsToOverride.put(KEYTAB.key, kt) } + + // If propagating the ivySettings file to the distributed cache, override the ivySettings + // file name with the name of the distributed file. + ivySettingsLocalizedPath.foreach { path => + confsToOverride.put("spark.jars.ivySettings", path) + } + + val localConfArchive = new Path(createConfArchive(confsToOverride).toURI()) copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true, destName = Some(LOCALIZED_CONF_ARCHIVE)) @@ -701,8 +738,10 @@ private[spark] class Client( * * The archive also contains some Spark configuration. Namely, it saves the contents of * SparkConf in a file to be loaded by the AM process. + * + * @param confsToOverride configs that should overriden when creating the final spark conf file */ - private def createConfArchive(): File = { + private def createConfArchive(confsToOverride: Map[String, String]): File = { val hadoopConfFiles = new HashMap[String, File]() // SPARK_CONF_DIR shows up in the classpath before HADOOP_CONF_DIR/YARN_CONF_DIR @@ -764,7 +803,7 @@ private[spark] class Client( if url.getProtocol == "file" } { val file = new File(url.getPath()) confStream.putNextEntry(new ZipEntry(file.getName())) - Files.copy(file, confStream) + Files.copy(file.toPath, confStream) confStream.closeEntry() } @@ -775,7 +814,7 @@ private[spark] class Client( hadoopConfFiles.foreach { case (name, file) => if (file.canRead()) { confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name")) - Files.copy(file, confStream) + Files.copy(file.toPath, confStream) confStream.closeEntry() } } @@ -788,11 +827,7 @@ private[spark] class Client( // Save Spark configuration to a file in the archive. val props = confToProperties(sparkConf) - - // If propagating the keytab to the AM, override the keytab name with the name of the - // distributed file. - amKeytabFileName.foreach { kt => props.setProperty(KEYTAB.key, kt) } - + confsToOverride.foreach { case (k, v) => props.setProperty(k, v)} writePropertiesToArchive(props, SPARK_CONF_FILE, confStream) // Write the distributed cache config to the archive. diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 9bc934d..26ff3bf 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -29,6 +29,7 @@ import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.scalatest.concurrent.Eventually._ +import org.scalatest.exceptions.TestFailedException import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ @@ -368,6 +369,64 @@ class YarnClusterSuite extends BaseYarnClusterSuite { ) checkResult(finalState, result, "true") } + + def createEmptyIvySettingsFile: File = { + val emptyIvySettings = File.createTempFile("ivy", ".xml") + Files.write("<ivysettings />", emptyIvySettings, StandardCharsets.UTF_8) + emptyIvySettings + } + + test("SPARK-34472: ivySettings file with no scheme or file:// scheme should be " + + "localized on driver in cluster mode") { + val emptyIvySettings = createEmptyIvySettingsFile + // For file:// URIs or URIs without scheme, make sure that ivySettings conf was changed + // to the localized file. So the expected ivySettings path on the driver will start with + // the file name and then some random UUID suffix + testIvySettingsDistribution(clientMode = false, emptyIvySettings.getAbsolutePath, + emptyIvySettings.getName, prefixMatch = true) + testIvySettingsDistribution(clientMode = false, s"file://${emptyIvySettings.getAbsolutePath}", + emptyIvySettings.getName, prefixMatch = true) + } + + test("SPARK-34472: ivySettings file with no scheme or file:// scheme should retain " + + "user provided path in client mode") { + val emptyIvySettings = createEmptyIvySettingsFile + // In client mode, the file is present locally on the driver and so does not need to be + // distributed. So the user provided path should be kept as is. + testIvySettingsDistribution(clientMode = true, emptyIvySettings.getAbsolutePath, + emptyIvySettings.getAbsolutePath) + testIvySettingsDistribution(clientMode = true, s"file://${emptyIvySettings.getAbsolutePath}", + s"file://${emptyIvySettings.getAbsolutePath}") + } + + test("SPARK-34472: ivySettings file with non-file:// schemes should throw an error") { + val emptyIvySettings = createEmptyIvySettingsFile + val e1 = intercept[TestFailedException] { + testIvySettingsDistribution(clientMode = false, + s"local://${emptyIvySettings.getAbsolutePath}", "") + } + assert(e1.getMessage.contains("IllegalArgumentException: " + + "Scheme local not supported in spark.jars.ivySettings")) + val e2 = intercept[TestFailedException] { + testIvySettingsDistribution(clientMode = false, + s"hdfs://${emptyIvySettings.getAbsolutePath}", "") + } + assert(e2.getMessage.contains("IllegalArgumentException: " + + "Scheme hdfs not supported in spark.jars.ivySettings")) + } + + def testIvySettingsDistribution(clientMode: Boolean, ivySettingsPath: String, + expectedIvySettingsPrefixOnDriver: String, prefixMatch: Boolean = false): Unit = { + val result = File.createTempFile("result", null, tempDir) + val outFile = File.createTempFile("out", null, tempDir) + val finalState = runSpark(clientMode = clientMode, + mainClassName(YarnAddJarTest.getClass), + appArgs = Seq(result.getAbsolutePath, expectedIvySettingsPrefixOnDriver, + prefixMatch.toString), + extraConf = Map("spark.jars.ivySettings" -> ivySettingsPath), + outFile = Option(outFile)) + checkResult(finalState, result, outFile = Option(outFile)) + } } private[spark] class SaveExecutorInfo extends SparkListener { @@ -583,6 +642,50 @@ private object YarnClasspathTest extends Logging { } +private object YarnAddJarTest extends Logging { + def main(args: Array[String]): Unit = { + if (args.length != 3) { + // scalastyle:off println + System.err.println( + s""" + |Invalid command line: ${args.mkString(" ")} + | + |Usage: YarnAddJarTest [result file] [expected ivy settings path] [prefix match] + """.stripMargin) + // scalastyle:on println + System.exit(1) + } + + val resultPath = args(0) + val expectedIvySettingsPath = args(1) + val prefixMatch = args(2).toBoolean + val sc = new SparkContext(new SparkConf()) + + var result = "failure" + try { + val settingsFile = sc.getConf.get("spark.jars.ivySettings") + if (prefixMatch) { + assert(settingsFile !== expectedIvySettingsPath) + assert(settingsFile.startsWith(expectedIvySettingsPath)) + } else { + assert(settingsFile === expectedIvySettingsPath) + } + + val caught = intercept[RuntimeException] { + sc.addJar("ivy://org.fake-project.test:test:1.0.0") + } + if (caught.getMessage.contains("unresolved dependency: org.fake-project.test#test")) { + // "unresolved dependency" is expected as the dependency does not exist + // but exception like "Ivy settings file <file> does not exist" should result in failure + result = "success" + } + } finally { + Files.write(result, new File(resultPath), StandardCharsets.UTF_8) + sc.stop() + } + } +} + private object YarnLauncherTestApp { def main(args: Array[String]): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org