Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/21257#discussion_r197172859
--- Diff:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
---
@@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol(
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
}
+
+ /**
+ * now just record the file to be delete
+ */
+ override def deleteWithJob(fs: FileSystem, path: Path,
+ canDeleteNow: Boolean = true): Boolean = {
+ if (canDeleteNow) {
+ super.deleteWithJob(fs, path)
+ } else {
+ val set = if (pathsToDelete.contains(fs)) {
+ pathsToDelete(fs)
+ } else {
+ new mutable.HashSet[Path]()
+ }
+
+ set.add(path)
+ pathsToDelete.put(fs, set)
+ true
+ }
+ }
+
+ private def cleanPathToDelete(): Unit = {
+ // first delete the should delete special file
+ for (fs <- pathsToDelete.keys) {
+ for (path <- pathsToDelete(fs)) {
+ try {
+ if (!fs.delete(path, true)) {
+ logWarning(s"Delete path ${path} fail at job commit time")
--- End diff --
delete -> false just means there was nothing there, I wouldn't warn at that
point. Unless `delete()` throws an exception you assume that when the call
returns, `fs.exists(path)` does not hold -regardless of the return value.
(Special exception, the dest is "/")
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]