HeartSaVioR commented on a change in pull request #26590: [SPARK-29953][SS] 
Don't clean up source files for FileStreamSource if the files belong to the 
output of FileStreamSink
URL: https://github.com/apache/spark/pull/26590#discussion_r350493401
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##########
 @@ -267,10 +269,22 @@ class FileStreamSource(
     val logOffset = FileStreamSourceOffset(end).logOffset
 
     sourceCleaner.foreach { cleaner =>
-      val files = metadataLog.get(Some(logOffset), 
Some(logOffset)).flatMap(_._2)
-      val validFileEntities = files.filter(_.batchId == logOffset)
-      logDebug(s"completed file entries: ${validFileEntities.mkString(",")}")
-      validFileEntities.foreach(cleaner.clean)
+      sourceHasMetadata match {
+        case Some(true) if !warnedIgnoringCleanSourceOption =>
+          logWarning("Ignoring 'cleanSource' option since source path refers 
to the output" +
+            " directory of FileStreamSink.")
+          warnedIgnoringCleanSourceOption = true
+
+        case Some(false) =>
+          val files = metadataLog.get(Some(logOffset), 
Some(logOffset)).flatMap(_._2)
+          val validFileEntities = files.filter(_.batchId == logOffset)
+          logDebug(s"completed file entries: 
${validFileEntities.mkString(",")}")
+          validFileEntities.foreach(cleaner.clean)
+
+        case _ =>
+          logWarning("Ignoring 'cleanSource' option since Spark hasn't figured 
out whether " +
 
 Review comment:
   The only "odd" case I can imagine to reach here is, 
   
   1) the query executed which wrote the commit log of the last batch and 
stopped before writing the offset for next batch.
   2) the query is restarted, and `constructNextBatch` is called.
   3) somehow the source files are all deleted between 1) and 2), hence 
FileStreamSource doesn't see any file and cannot decide when `fetchAllFiles` is 
called.
   4) `constructNextBatch` will call `commit` for previous batch the query 
executed before.
   
   It's obviously very odd case as the content of source directory are modified 
(maybe) manually which we don't support the case (so throwing exception would 
be OK), but I'm not fully sure there's no another edge-cases.
   
   Btw, where do you recommend to add the exception? L287, or L205? If you're 
suggesting to add the exception in L205, I'm not sure I follow. If I'm 
understanding correctly, the case if the logic reaches `case _` won't reach 
L205.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to