Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/21257#discussion_r188262174
--- Diff:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
---
@@ -163,6 +170,15 @@ class HadoopMapReduceCommitProtocol(
}
override def commitJob(jobContext: JobContext, taskCommits:
Seq[TaskCommitMessage]): Unit = {
+ // first delete the should delete special file
+ for (fs <- pathsToDelete.keys) {
+ for (path <- pathsToDelete(fs)) {
+ if (fs.exists(path)) {
+ fs.delete(path, true)
--- End diff --
1. you don't need to do the exists check, it's just overhead. delete() will
return false if there was nothing to delete.
2. But...what if that delete throws an exception? Should the commit fail
(as it does now?), or downgraded. As an example, the hadoop
`FileOutputCommtter` uses the option
`"mapreduce.fileoutputcommitter.cleanup-failures.ignored` to choose what to do
there
3. ...and: what about cleanup in an abort job?
I think you'd be best off isolating this cleanup into its own method and
call from both job commit & job abort, in job commit discuss with others what
to do, and in job abort just log & continue
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]