Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15131#discussion_r79302302
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -1426,41 +1426,53 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
        * supported for Hadoop-supported filesystems.
        */
       def addFile(path: String, recursive: Boolean): Unit = {
    -    val uri = new URI(path)
    -    val schemeCorrectedPath = uri.getScheme match {
    -      case null | "local" => new File(path).getCanonicalFile.toURI.toString
    -      case _ => path
    -    }
    -
    -    val hadoopPath = new Path(schemeCorrectedPath)
    -    val scheme = new URI(schemeCorrectedPath).getScheme
    -    if (!Array("http", "https", "ftp").contains(scheme)) {
    -      val fs = hadoopPath.getFileSystem(hadoopConfiguration)
    -      val isDir = fs.getFileStatus(hadoopPath).isDirectory
    -      if (!isLocal && scheme == "file" && isDir) {
    -        throw new SparkException(s"addFile does not support local 
directories when not running " +
    -          "local mode.")
    +    if (path == null) {
    +      logWarning("null specified as parameter to addFile")
    +    } else {
    +      var key = ""
    +      if (path.contains("\\")) {
    +        // For local paths with backslashes on Windows, URI throws an 
exception
    +        key = env.rpcEnv.fileServer.addFile(new File(path))
    +      } else {
    +        val uri = new URI(path)
    +        val schemeCorrectedPath = uri.getScheme match {
    +          case null | "local" => new 
File(path).getCanonicalFile.toURI.toString
    +          case _ => path
    +        }
    +
    +        val hadoopPath = new Path(schemeCorrectedPath)
    +        val scheme = new URI(schemeCorrectedPath).getScheme
    +        if (!Array("http", "https", "ftp").contains(scheme)) {
    +          val fs = hadoopPath.getFileSystem(hadoopConfiguration)
    +          val isDir = fs.getFileStatus(hadoopPath).isDirectory
    +          if (!isLocal && scheme == "file" && isDir) {
    +            throw new SparkException(s"addFile does not support local 
directories when not " +
    +              "running local mode.")
    +          }
    +          if (!recursive && isDir) {
    +            throw new SparkException(s"Added file $hadoopPath is a 
directory and recursive " +
    +              "is not turned on.")
    +          }
    +        }
    +
    +        key = if (!isLocal && scheme == "file") {
    +          env.rpcEnv.fileServer.addFile(new File(uri.getPath))
    +        } else {
    +          schemeCorrectedPath
    +        }
           }
    -      if (!recursive && isDir) {
    -        throw new SparkException(s"Added file $hadoopPath is a directory 
and recursive is not " +
    -          "turned on.")
    +      if (key != null) {
    +        val timestamp = System.currentTimeMillis
    +        if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
    +          logInfo(s"Added file $path at $key with timestamp $timestamp")
    +          // Fetch the file locally so that closures which are run on the 
driver can still use the
    +          // SparkFiles API to access files.
    +          Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), 
conf, env.securityManager,
    --- End diff --
    
    Ah, sorry, I should've checked the codes further. It seems 
`Utils.fetchFile` is asking `url` as the first argument not path. How about 
changing `val uri = new URI(path)` to `val uri = new Path(path).toUri` rather 
than resembling `addJar`?
    
    I remember we discussed this problem in 
https://github.com/apache/spark/pull/14960. If comma separated multiple paths 
are allow in `path`, it will be problematic but it seems the `path` is only 
allowed as a single.
    
    ```scala
    import java.net.URI
    import org.apache.hadoop.fs.Path
    
    scala> new Path("C:\\a\\b\\c").toUri
    res1: java.net.URI = /C:/a/b/c
    
    scala> new Path("C:/a/b/c").toUri
    res2: java.net.URI = /C:/a/b/c
    
    scala> new Path("/a/b/c").toUri
    res3: java.net.URI = /a/b/c
    
    scala> new Path("file:///a/b/c").toUri
    res3: java.net.URI = file:///a/b/c
    
    scala> new Path("http://localhost/a/b/c";).toUri
    res3: java.net.URI = http://localhost/a/b/c
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to