HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r340308673
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##########
 @@ -330,4 +362,77 @@ object FileStreamSource {
 
     def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+      fileSystem: FileSystem,
+      sourcePath: Path,
+      baseArchiveFileSystem: Option[FileSystem],
+      baseArchivePath: Option[Path]) extends Logging {
+    assertParameters()
+
+    private def assertParameters(): Unit = {
+      require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+      baseArchiveFileSystem.foreach { fs =>
+        require(fileSystem.getUri == fs.getUri, "Base archive path is located 
on a different " +
+          s"file system than the source files. source path: $sourcePath" +
+          s" / base archive path: ${baseArchivePath.get}")
+      }
+
+      baseArchivePath.foreach { path =>
+
+        /**
+         * FileStreamSource reads the files which one of below conditions is 
met:
+         * 1) file itself is matched with source path
+         * 2) parent directory is matched with source path
+         *
+         * Checking with glob pattern is costly, so set this requirement to 
eliminate the cases
+         * where the archive path can be matched with source path. For 
example, when file is moved
+         * to archive directory, destination path will retain input file's 
path as suffix, so
+         * destination path can't be matched with source path if archive 
directory's depth is longer
+         * than 2, as neither file nor parent directory of destination path 
can be matched with
+         * source path.
+         */
+        require(path.depth() > 2, "Base archive path must have a depth of at 
least 2 " +
+          "subdirectories. e.g. '/data/archive'")
+      }
+    }
+
+    def archive(entry: FileEntry): Unit = {
+      require(baseArchivePath.isDefined)
+
+      val curPath = new Path(new URI(entry.path))
+      val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
 
 Review comment:
   I'm revisiting two issues and not sure there's a viable workaround. Looks 
like the issue pointed out was ":" isn't a valid char for HDFS but might be a 
valid char for other filesystems so Path API doesn't restrict it and leads 
problem. Even HDFS-14762 is closed as "Won't fix". 
   
   Would this only occur on `Path(parent, child)` and `Path(pathstr)` is safe? 
Would it work if we manually concat two paths as string and pass to Path's 
constructor?
   
    

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to