Repository: spark
Updated Branches:
  refs/heads/master 79c668942 -> e2c7e09f7


[SPARK-24646][CORE] Minor change to spark.yarn.dist.forceDownloadSchemes to 
support wildcard '*'

## What changes were proposed in this pull request?

In the case of getting tokens via customized `ServiceCredentialProvider`, it is 
required that `ServiceCredentialProvider` be available in local spark-submit 
process classpath. In this case, all the configured remote sources should be 
forced to download to local.

For the ease of using this configuration, here propose to add wildcard '*' 
support to `spark.yarn.dist.forceDownloadSchemes`, also clarify the usage of 
this configuration.

## How was this patch tested?

New UT added.

Author: jerryshao <ss...@hortonworks.com>

Closes #21633 from jerryshao/SPARK-21917-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2c7e09f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2c7e09f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2c7e09f

Branch: refs/heads/master
Commit: e2c7e09f742a7e522efd74fe8e14c2620afdb522
Parents: 79c6689
Author: jerryshao <ss...@hortonworks.com>
Authored: Mon Jul 9 10:21:40 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Mon Jul 9 10:21:40 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   |  5 ++--
 .../apache/spark/internal/config/package.scala  |  5 ++--
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 29 +++++++++++++-------
 docs/running-on-yarn.md                         |  5 ++--
 4 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2c7e09f/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 2da778a..e7310ee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -385,7 +385,7 @@ private[spark] class SparkSubmit extends Logging {
       val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
 
       def shouldDownload(scheme: String): Boolean = {
-        forceDownloadSchemes.contains(scheme) ||
+        forceDownloadSchemes.contains("*") || 
forceDownloadSchemes.contains(scheme) ||
           Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
       }
 
@@ -578,7 +578,8 @@ private[spark] class SparkSubmit extends Logging {
     }
     // Add the main application jar and any added jars to classpath in case 
YARN client
     // requires these jars.
-    // This assumes both primaryResource and user jars are local jars, 
otherwise it will not be
+    // This assumes both primaryResource and user jars are local jars, or 
already downloaded
+    // to local by configuring "spark.yarn.dist.forceDownloadSchemes", 
otherwise it will not be
     // added to the classpath of YARN client.
     if (isYarnCluster) {
       if (isUserJar(args.primaryResource)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e2c7e09f/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index bda9795..ba892bf 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -486,10 +486,11 @@ package object config {
 
   private[spark] val FORCE_DOWNLOAD_SCHEMES =
     ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
-      .doc("Comma-separated list of schemes for which files will be downloaded 
to the " +
+      .doc("Comma-separated list of schemes for which resources will be 
downloaded to the " +
         "local disk prior to being added to YARN's distributed cache. For use 
in cases " +
         "where the YARN service does not support schemes that are supported by 
Spark, like http, " +
-        "https and ftp.")
+        "https and ftp, or jars required to be in the local YARN client's 
classpath. Wildcard " +
+        "'*' is denoted to download resources for all the schemes.")
       .stringConf
       .toSequence
       .createWithDefault(Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2c7e09f/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 545c8d0..f829fec 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -995,20 +995,24 @@ class SparkSubmitSuite
   }
 
   test("download remote resource if it is not supported by yarn service") {
-    testRemoteResources(enableHttpFs = false, blacklistHttpFs = false)
+    testRemoteResources(enableHttpFs = false)
   }
 
   test("avoid downloading remote resource if it is supported by yarn service") 
{
-    testRemoteResources(enableHttpFs = true, blacklistHttpFs = false)
+    testRemoteResources(enableHttpFs = true)
   }
 
   test("force download from blacklisted schemes") {
-    testRemoteResources(enableHttpFs = true, blacklistHttpFs = true)
+    testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("http"))
+  }
+
+  test("force download for all the schemes") {
+    testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
   }
 
   private def testRemoteResources(
       enableHttpFs: Boolean,
-      blacklistHttpFs: Boolean): Unit = {
+      blacklistSchemes: Seq[String] = Nil): Unit = {
     val hadoopConf = new Configuration()
     updateConfWithFakeS3Fs(hadoopConf)
     if (enableHttpFs) {
@@ -1025,8 +1029,8 @@ class SparkSubmitSuite
     val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> 
"USER"), tmpDir)
     val tmpHttpJarPath = s"http://${new 
File(tmpHttpJar.toURI).getAbsolutePath}"
 
-    val forceDownloadArgs = if (blacklistHttpFs) {
-      Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http")
+    val forceDownloadArgs = if (blacklistSchemes.nonEmpty) {
+      Seq("--conf", 
s"spark.yarn.dist.forceDownloadSchemes=${blacklistSchemes.mkString(",")}")
     } else {
       Nil
     }
@@ -1044,14 +1048,19 @@ class SparkSubmitSuite
 
     val jars = conf.get("spark.yarn.dist.jars").split(",").toSet
 
-    // The URI of remote S3 resource should still be remote.
-    assert(jars.contains(tmpS3JarPath))
+    def isSchemeBlacklisted(scheme: String) = {
+      blacklistSchemes.contains("*") || blacklistSchemes.contains(scheme)
+    }
+
+    if (!isSchemeBlacklisted("s3")) {
+      assert(jars.contains(tmpS3JarPath))
+    }
 
-    if (enableHttpFs && !blacklistHttpFs) {
+    if (enableHttpFs && blacklistSchemes.isEmpty) {
       // If Http FS is supported by yarn service, the URI of remote http 
resource should
       // still be remote.
       assert(jars.contains(tmpHttpJarPath))
-    } else {
+    } else if (!enableHttpFs || isSchemeBlacklisted("http")) {
       // If Http FS is not supported by yarn service, or http scheme is 
configured to be force
       // downloading, the URI of remote http resource should be changed to a 
local one.
       val jarName = new File(tmpHttpJar.toURI).getName

http://git-wip-us.apache.org/repos/asf/spark/blob/e2c7e09f/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 575da72..0b265b0 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -218,9 +218,10 @@ To use a custom metrics.properties for the application 
master and executors, upd
   <td><code>spark.yarn.dist.forceDownloadSchemes</code></td>
   <td><code>(none)</code></td>
   <td>
-    Comma-separated list of schemes for which files will be downloaded to the 
local disk prior to
+    Comma-separated list of schemes for which resources will be downloaded to 
the local disk prior to
     being added to YARN's distributed cache. For use in cases where the YARN 
service does not
-    support schemes that are supported by Spark, like http, https and ftp.
+    support schemes that are supported by Spark, like http, https and ftp, or 
jars required to be in the
+    local YARN client's classpath. Wildcard '*' is denoted to download 
resources for all the schemes.
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to