Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21257#discussion_r197172859
  
    --- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
    @@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol(
           tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
         }
       }
    +
    +  /**
    +   * now just record the file to be delete
    +   */
    +  override def deleteWithJob(fs: FileSystem, path: Path,
    +      canDeleteNow: Boolean = true): Boolean = {
    +    if (canDeleteNow) {
    +      super.deleteWithJob(fs, path)
    +    } else {
    +      val set = if (pathsToDelete.contains(fs)) {
    +        pathsToDelete(fs)
    +      } else {
    +        new mutable.HashSet[Path]()
    +      }
    +
    +      set.add(path)
    +      pathsToDelete.put(fs, set)
    +      true
    +    }
    +  }
    +
    +  private def cleanPathToDelete(): Unit = {
    +    // first delete the should delete special file
    +    for (fs <- pathsToDelete.keys) {
    +      for (path <- pathsToDelete(fs)) {
    +        try {
    +          if (!fs.delete(path, true)) {
    +            logWarning(s"Delete path ${path} fail at job commit time")
    --- End diff --
    
    delete -> false just means there was nothing there, I wouldn't warn at that 
point. Unless `delete()` throws an exception you assume that when the call 
returns, `fs.exists(path)` does not hold -regardless of the return value. 
(Special exception, the dest is "/")


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to