HeartSaVioR commented on a change in pull request #24186: [SPARK-27254][SS] 
Cleanup complete but invalid output files in ManifestFileCommitProtocol if job 
is aborted
URL: https://github.com/apache/spark/pull/24186#discussion_r300214568
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 ##########
 @@ -70,7 +80,24 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
 
   override def abortJob(jobContext: JobContext): Unit = {
     require(fileLog != null, "setupManifestOptions must be called before this 
function")
-    // Do nothing
+    // best effort cleanup of complete files from failed job
+    // since the file has UUID in its filename, we are safe to try deleting 
them
+    // not deleting them doesn't help reducing cost to saving files when 
retrying
+    if (pendingCommitFiles.nonEmpty) {
+      pendingCommitFiles.foreach { file =>
+        val path = new Path(file)
+        val fs = path.getFileSystem(jobContext.getConfiguration)
+        // this is to make sure the file can be seen from driver as well
+        if (fs.exists(path)) {
+          fs.delete(path, false)
 
 Review comment:
   Nice suggestion. I guess we might also not want to make abortJob() failing 
due to IOException since we are just doing "best-effort". Will update.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to