Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19130#discussion_r138693449
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -367,6 +368,53 @@ object SparkSubmit extends CommandLineUtils with
Logging {
}.orNull
}
+ // When running in YARN cluster manager,
+ if (clusterManager == YARN) {
+ sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF,
"unused")
+ val secMgr = new SecurityManager(sparkConf)
+ val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
+
+ // Check the scheme list provided by
"spark.yarn.dist.forceDownloadSchemes" to see if current
+ // resource's scheme is included in this list, or Hadoop FileSystem
doesn't support current
+ // scheme, if so Spark will download the resources to local disk and
upload to Hadoop FS.
+ def shouldDownload(scheme: String): Boolean = {
+ val isFsAvailable = Try { FileSystem.getFileSystemClass(scheme,
hadoopConf) }
+ .map(_ => true).getOrElse(false)
+ forceDownloadSchemes.contains(scheme) || !isFsAvailable
+ }
+
+ def downloadResource(resource: String): String = {
+ val uri = Utils.resolveURI(resource)
+ uri.getScheme match {
+ case "local" | "file" => resource
+ case e if shouldDownload(e) =>
+ if (deployMode == CLIENT) {
+ // In client mode, we already download the resources, so
figuring out the local one
+ // should be enough.
+ val fileName = new Path(uri).getName
+ new File(targetDir, fileName).toURI.toString
+ } else {
+ downloadFile(resource, targetDir, sparkConf, hadoopConf,
secMgr)
+ }
+ case _ => uri.toString
+ }
+ }
+
+ args.primaryResource = Option(args.primaryResource).map {
downloadResource }.orNull
+ args.files = Option(args.files).map { files =>
+ files.split(",").map(_.trim).filter(_.nonEmpty).map {
downloadResource }.mkString(",")
+ }.orNull
+ args.pyFiles = Option(args.pyFiles).map { files =>
+ files.split(",").map(_.trim).filter(_.nonEmpty).map {
downloadResource }.mkString(",")
+ }.orNull
+ args.jars = Option(args.jars).map { files =>
+ files.split(",").map(_.trim).filter(_.nonEmpty).map {
downloadResource }.mkString(",")
+ }.orNull
+ args.archives = Option(args.archives).map { files =>
+ files.split(",").map(_.trim).filter(_.nonEmpty).map {
downloadResource }.mkString(",")
+ }.orNull
--- End diff --
I was going to say this is missing `spark.yarn.dist.files` and `.jars`, but
later those properties seem to be set based on `args.files` and `args.jars`.
Which kinda raises the question of what happens when the user sets both.
From the documentation it sounds like that should work (both sets of files get
added), but from the code it seems `--files` and `--jars` would overwrite the
`spark.yarn.*` configs...
In any case, that's not the fault of your change.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]