Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138801550 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,53 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } + // When running in YARN cluster manager, + if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + // Check the scheme list provided by "spark.yarn.dist.forceDownloadSchemes" to see if current + // resource's scheme is included in this list, or Hadoop FileSystem doesn't support current + // scheme, if so Spark will download the resources to local disk and upload to Hadoop FS. + def shouldDownload(scheme: String): Boolean = { + val isFsAvailable = Try { FileSystem.getFileSystemClass(scheme, hadoopConf) } + .map(_ => true).getOrElse(false) + forceDownloadSchemes.contains(scheme) || !isFsAvailable + } + + def downloadResource(resource: String): String = { + val uri = Utils.resolveURI(resource) + uri.getScheme match { + case "local" | "file" => resource + case e if shouldDownload(e) => + if (deployMode == CLIENT) { + // In client mode, we already download the resources, so figuring out the local one + // should be enough. + val fileName = new Path(uri).getName + new File(targetDir, fileName).toURI.toString + } else { + downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) + } + case _ => uri.toString + } + } + + args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull + args.files = Option(args.files).map { files => + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.pyFiles = Option(args.pyFiles).map { files => + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.jars = Option(args.jars).map { files => + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.archives = Option(args.archives).map { files => + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull --- End diff -- From the code `--files` and `--jars` overwrite `spark.yarn.*` long ago AFAIK. What I think is that we should make `spark.yarn.*` as an internal configurations to reduce the discrepancy.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org