Github user jerryshao commented on a diff in the pull request:
https://github.com/apache/spark/pull/19130#discussion_r138801550
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -367,6 +368,53 @@ object SparkSubmit extends CommandLineUtils with
Logging {
}.orNull
}
+ // When running in YARN cluster manager,
+ if (clusterManager == YARN) {
+ sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF,
"unused")
+ val secMgr = new SecurityManager(sparkConf)
+ val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
+
+ // Check the scheme list provided by
"spark.yarn.dist.forceDownloadSchemes" to see if current
+ // resource's scheme is included in this list, or Hadoop FileSystem
doesn't support current
+ // scheme, if so Spark will download the resources to local disk and
upload to Hadoop FS.
+ def shouldDownload(scheme: String): Boolean = {
+ val isFsAvailable = Try { FileSystem.getFileSystemClass(scheme,
hadoopConf) }
+ .map(_ => true).getOrElse(false)
+ forceDownloadSchemes.contains(scheme) || !isFsAvailable
+ }
+
+ def downloadResource(resource: String): String = {
+ val uri = Utils.resolveURI(resource)
+ uri.getScheme match {
+ case "local" | "file" => resource
+ case e if shouldDownload(e) =>
+ if (deployMode == CLIENT) {
+ // In client mode, we already download the resources, so
figuring out the local one
+ // should be enough.
+ val fileName = new Path(uri).getName
+ new File(targetDir, fileName).toURI.toString
+ } else {
+ downloadFile(resource, targetDir, sparkConf, hadoopConf,
secMgr)
+ }
+ case _ => uri.toString
+ }
+ }
+
+ args.primaryResource = Option(args.primaryResource).map {
downloadResource }.orNull
+ args.files = Option(args.files).map { files =>
+ files.split(",").map(_.trim).filter(_.nonEmpty).map {
downloadResource }.mkString(",")
+ }.orNull
+ args.pyFiles = Option(args.pyFiles).map { files =>
+ files.split(",").map(_.trim).filter(_.nonEmpty).map {
downloadResource }.mkString(",")
+ }.orNull
+ args.jars = Option(args.jars).map { files =>
+ files.split(",").map(_.trim).filter(_.nonEmpty).map {
downloadResource }.mkString(",")
+ }.orNull
+ args.archives = Option(args.archives).map { files =>
+ files.split(",").map(_.trim).filter(_.nonEmpty).map {
downloadResource }.mkString(",")
+ }.orNull
--- End diff --
From the code `--files` and `--jars` overwrite `spark.yarn.*` long ago
AFAIK. What I think is that we should make `spark.yarn.*` as an internal
configurations to reduce the discrepancy.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]