Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/21257#discussion_r187954805
--- Diff:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
---
@@ -235,4 +247,23 @@ class HadoopMapReduceCommitProtocol(
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
}
+
+ /**
+ * now just record the file to be delete
+ */
+ override def deleteWithJob(fs: FileSystem, path: Path, recursive:
Boolean,
+ canDeleteNow: Boolean = true): Boolean = {
+ if (canDeleteNow) {
+ super.deleteWithJob(fs, path, recursive)
+ } else {
+ pathsToDelete.add(path -> recursive)
+ }
+ }
+
+ private def deletePath(fs: FileSystem, path: Path, recursive: Boolean):
Unit = {
+ if (fs.exists(path) && !fs.delete(path, recursive)) {
+ throw new IOException(s"Unable to clear output " +
+ s"directory $path")
+ }
--- End diff --
I'd personally ignore a failure on delete(), as the conditions for the API
call are "if this doesn't raise an exception then the dest is gone". You can
skip the exists check as it will be superfluous
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]