HeartSaVioR commented on a change in pull request #26590: [SPARK-29953][SS] 
Don't clean up source files for FileStreamSource if the files belong to the 
output of FileStreamSink
URL: https://github.com/apache/spark/pull/26590#discussion_r348840565
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##########
 @@ -342,11 +343,49 @@ object FileStreamSource {
     def size: Int = map.size()
   }
 
-  private[sql] trait FileStreamSourceCleaner {
-    def clean(entry: FileEntry): Unit
+  private[sql] abstract class FileStreamSourceCleaner(
+      val fileSystem: FileSystem,
+      val sourcePath: Path) extends Logging {
+
+    private val srcPathToContainFileStreamSinkMetadata = new 
mutable.HashMap[Path, Boolean]
+
+    def clean(entries: Seq[FileEntry]): Unit = {
+      doClean(excludeFilesFromFileStreamSink(entries))
+    }
+
+    private def excludeFilesFromFileStreamSink(entries: Seq[FileEntry]): 
Seq[FileEntry] = {
+      val srcPathToEntries = entries.groupBy { entry =>
+        // find the ancestor which has same depth as source path
+        var curPath = new Path(new URI(entry.path))
+        while (curPath.depth() > sourcePath.depth()) {
+          curPath = curPath.getParent
+        }
+        curPath
+      }
+
+      srcPathToEntries.filterKeys { srcPath =>
+        srcPathToContainFileStreamSinkMetadata.get(srcPath) match {
+          case Some(v) => !v
 
 Review comment:
   Yeah that's the reason I asked for more voices - the value of cache can be 
invalid at any time, but then we can't cache it and have to check every time 
which is resource-inefficient. Maybe I'd be even OK to not use cache given 
we'll do it in background, but want to check if it's only me.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to