Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/21257#discussion_r197173180
--- Diff:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
---
@@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol(
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
}
+
+ /**
+ * now just record the file to be delete
+ */
+ override def deleteWithJob(fs: FileSystem, path: Path,
+ canDeleteNow: Boolean = true): Boolean = {
+ if (canDeleteNow) {
+ super.deleteWithJob(fs, path)
+ } else {
+ val set = if (pathsToDelete.contains(fs)) {
+ pathsToDelete(fs)
+ } else {
+ new mutable.HashSet[Path]()
+ }
+
+ set.add(path)
+ pathsToDelete.put(fs, set)
+ true
+ }
+ }
+
+ private def cleanPathToDelete(): Unit = {
+ // first delete the should delete special file
+ for (fs <- pathsToDelete.keys) {
+ for (path <- pathsToDelete(fs)) {
+ try {
+ if (!fs.delete(path, true)) {
+ logWarning(s"Delete path ${path} fail at job commit time")
+ }
+ } catch {
+ case ex: IOException =>
+ throw new IOException(s"Unable to clear output " +
+ s"file ${path} at job commit time", ex)
--- End diff --
recommend including ex.toString() in the new exception raised, as child
exception text can often get lost
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]