zsxwing commented on a change in pull request #26590: [SPARK-29953][SS] Don't 
clean up source files for FileStreamSource if the files belong to the output of 
FileStreamSink
URL: https://github.com/apache/spark/pull/26590#discussion_r352759721
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##########
 @@ -267,10 +269,22 @@ class FileStreamSource(
     val logOffset = FileStreamSourceOffset(end).logOffset
 
     sourceCleaner.foreach { cleaner =>
-      val files = metadataLog.get(Some(logOffset), 
Some(logOffset)).flatMap(_._2)
-      val validFileEntities = files.filter(_.batchId == logOffset)
-      logDebug(s"completed file entries: ${validFileEntities.mkString(",")}")
-      validFileEntities.foreach(cleaner.clean)
+      sourceHasMetadata match {
+        case Some(true) if !warnedIgnoringCleanSourceOption =>
+          logWarning("Ignoring 'cleanSource' option since source path refers 
to the output" +
+            " directory of FileStreamSink.")
+          warnedIgnoringCleanSourceOption = true
+
+        case Some(false) =>
+          val files = metadataLog.get(Some(logOffset), 
Some(logOffset)).flatMap(_._2)
+          val validFileEntities = files.filter(_.batchId == logOffset)
+          logDebug(s"completed file entries: 
${validFileEntities.mkString(",")}")
+          validFileEntities.foreach(cleaner.clean)
+
+        case _ =>
+          logWarning("Ignoring 'cleanSource' option since Spark hasn't figured 
out whether " +
 
 Review comment:
   > 3\. somehow the source files are all deleted between 1) and 2)
   
   This should be a user error.
   
   My general point is we should make sure the data files and the metadata in 
`_spark_metadata` are consistent and we should prevent from cleaning up data 
files that are still tracked. Logging a warning without really deleting files 
is a solution, however, most of users won't be able to notice this warning from 
their logs. Hence we should detect this earlier. There is already a variable 
`sourceHasMetadata` tracking whether the source is reading from a file stream 
sink or not. We can check the options and throw an exception when flipping it. 
What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to