dongjoon-hyun commented on a change in pull request #24186: [SPARK-27254][SS]
Cleanup complete but invalid output files in ManifestFileCommitProtocol if job
is aborted
URL: https://github.com/apache/spark/pull/24186#discussion_r300172430
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
##########
@@ -70,7 +80,24 @@ class ManifestFileCommitProtocol(jobId: String, path:
String)
override def abortJob(jobContext: JobContext): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this
function")
- // Do nothing
+ // best effort cleanup of complete files from failed job
+ // since the file has UUID in its filename, we are safe to try deleting
them
+ // not deleting them doesn't help reducing cost to saving files when
retrying
+ if (pendingCommitFiles.nonEmpty) {
+ pendingCommitFiles.foreach { file =>
+ val path = new Path(file)
+ val fs = path.getFileSystem(jobContext.getConfiguration)
+ // this is to make sure the file can be seen from driver as well
+ if (fs.exists(path)) {
+ fs.delete(path, false)
Review comment:
How about adding `try` here to catch `IOException`?
I hope we can iterate the whole `pendingCommitFiles` when we fail to delete
some of them.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]