Github user yanboliang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15131#discussion_r79303180
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -1426,41 +1426,53 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
        * supported for Hadoop-supported filesystems.
        */
       def addFile(path: String, recursive: Boolean): Unit = {
    -    val uri = new URI(path)
    -    val schemeCorrectedPath = uri.getScheme match {
    -      case null | "local" => new File(path).getCanonicalFile.toURI.toString
    -      case _ => path
    -    }
    -
    -    val hadoopPath = new Path(schemeCorrectedPath)
    -    val scheme = new URI(schemeCorrectedPath).getScheme
    -    if (!Array("http", "https", "ftp").contains(scheme)) {
    -      val fs = hadoopPath.getFileSystem(hadoopConfiguration)
    -      val isDir = fs.getFileStatus(hadoopPath).isDirectory
    -      if (!isLocal && scheme == "file" && isDir) {
    -        throw new SparkException(s"addFile does not support local 
directories when not running " +
    -          "local mode.")
    +    if (path == null) {
    +      logWarning("null specified as parameter to addFile")
    +    } else {
    +      var key = ""
    +      if (path.contains("\\")) {
    +        // For local paths with backslashes on Windows, URI throws an 
exception
    +        key = env.rpcEnv.fileServer.addFile(new File(path))
    +      } else {
    +        val uri = new URI(path)
    +        val schemeCorrectedPath = uri.getScheme match {
    +          case null | "local" => new 
File(path).getCanonicalFile.toURI.toString
    +          case _ => path
    +        }
    +
    +        val hadoopPath = new Path(schemeCorrectedPath)
    +        val scheme = new URI(schemeCorrectedPath).getScheme
    +        if (!Array("http", "https", "ftp").contains(scheme)) {
    +          val fs = hadoopPath.getFileSystem(hadoopConfiguration)
    +          val isDir = fs.getFileStatus(hadoopPath).isDirectory
    +          if (!isLocal && scheme == "file" && isDir) {
    +            throw new SparkException(s"addFile does not support local 
directories when not " +
    +              "running local mode.")
    +          }
    +          if (!recursive && isDir) {
    +            throw new SparkException(s"Added file $hadoopPath is a 
directory and recursive " +
    +              "is not turned on.")
    +          }
    +        }
    +
    +        key = if (!isLocal && scheme == "file") {
    +          env.rpcEnv.fileServer.addFile(new File(uri.getPath))
    +        } else {
    +          schemeCorrectedPath
    +        }
           }
    -      if (!recursive && isDir) {
    -        throw new SparkException(s"Added file $hadoopPath is a directory 
and recursive is not " +
    -          "turned on.")
    +      if (key != null) {
    +        val timestamp = System.currentTimeMillis
    +        if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
    +          logInfo(s"Added file $path at $key with timestamp $timestamp")
    +          // Fetch the file locally so that closures which are run on the 
driver can still use the
    +          // SparkFiles API to access files.
    +          Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), 
conf, env.securityManager,
    --- End diff --
    
    @HyukjinKwon Thanks for your suggestion, will update it soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to