vanzin commented on a change in pull request #22952: [SPARK-20568][SS] Provide 
option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338327353
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1596,6 +1615,282 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
       }
     }
   }
+
+  test("remove completed files when remove option is enabled") {
+    def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
+      assert(!files.exists(_.startsWith(fileName)))
+    }
+
+    def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = 
{
+      assert(files.exists(_.startsWith(fileName)))
+    }
+
+    withTempDirs { case (src, tmp) =>
+      withSQLConf(
+        SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+        // Force deleting the old logs
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+      ) {
+        val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+          "cleanSource" -> "delete")
+
+        val fileStream = createFileStream("text", src.getCanonicalPath, 
options = option)
+        val filtered = fileStream.filter($"value" contains "keep")
+
+        testStream(filtered)(
+          AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+          CheckAnswer("keep1"),
+          AssertOnQuery("input file removed") { _: StreamExecution =>
+            // it doesn't rename any file yet
+            assertFileIsNotRemoved(src.list(), "keep1")
+            true
+          },
+          AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
+          CheckAnswer("keep1", "keep2"),
+          AssertOnQuery("input file removed") { _: StreamExecution =>
+            val files = src.list()
+
+            // it renames input file for first batch, but not for second batch 
yet
+            assertFileIsRemoved(files, "keep1")
+            assertFileIsNotRemoved(files, "ke ep2 %")
+
+            true
+          },
+          AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
+          CheckAnswer("keep1", "keep2", "keep3"),
+          AssertOnQuery("input file renamed") { _: StreamExecution =>
+            val files = src.list()
+
+            // it renames input file for second batch, but not third batch yet
+            assertFileIsRemoved(files, "ke ep2 %")
+            assertFileIsNotRemoved(files, "keep3")
+
+            true
+          }
+        )
+      }
+    }
+  }
+
+  test("move completed files to archive directory when archive option is 
enabled") {
+
+    withThreeTempDirs { case (src, tmp, archiveDir) =>
+      withSQLConf(
+        SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+        // Force deleting the old logs
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+      ) {
+        val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+          "cleanSource" -> "archive", "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)
+
+        val fileStream = createFileStream("text", 
s"${src.getCanonicalPath}/*/*",
+          options = option)
+        val filtered = fileStream.filter($"value" contains "keep")
+
+        // src/k %1
+        // file: src/k %1/keep1
+        val dirForKeep1 = new File(src, "k %1")
+        // src/k %1/k 2
+        // file: src/k %1/k 2/keep2
+        val dirForKeep2 = new File(dirForKeep1, "k 2")
+        // src/k3
+        // file: src/k3/keep3
+        val dirForKeep3 = new File(src, "k3")
+
+        val expectedMovedDir1 = new File(archiveDir.getAbsolutePath + 
dirForKeep1.toURI.getPath)
+        val expectedMovedDir2 = new File(archiveDir.getAbsolutePath + 
dirForKeep2.toURI.getPath)
+        val expectedMovedDir3 = new File(archiveDir.getAbsolutePath + 
dirForKeep3.toURI.getPath)
+
+        testStream(filtered)(
+          AddTextFileData("keep1", dirForKeep1, tmp, tmpFilePrefix = "keep1"),
+          CheckAnswer("keep1"),
+          AssertOnQuery("input file archived") { _: StreamExecution =>
+            // it doesn't rename any file yet
+            assertFileIsNotMoved(dirForKeep1, expectedMovedDir1, "keep1")
+            true
+          },
+          AddTextFileData("keep2", dirForKeep2, tmp, tmpFilePrefix = "keep2 
%"),
+          CheckAnswer("keep1", "keep2"),
+          AssertOnQuery("input file archived") { _: StreamExecution =>
+            // it renames input file for first batch, but not for second batch 
yet
+            assertFileIsMoved(dirForKeep1, expectedMovedDir1, "keep1")
+            assertFileIsNotMoved(dirForKeep2, expectedMovedDir2, "keep2 %")
+            true
+          },
+          AddTextFileData("keep3", dirForKeep3, tmp, tmpFilePrefix = "keep3"),
+          CheckAnswer("keep1", "keep2", "keep3"),
+          AssertOnQuery("input file archived") { _: StreamExecution =>
+            // it renames input file for second batch, but not third batch yet
+            assertFileIsMoved(dirForKeep2, expectedMovedDir2, "keep2 %")
+            assertFileIsNotMoved(dirForKeep3, expectedMovedDir3, "keep3")
+
+            true
+          },
+          AddTextFileData("keep4", dirForKeep3, tmp, tmpFilePrefix = "keep4"),
+          CheckAnswer("keep1", "keep2", "keep3", "keep4"),
+          AssertOnQuery("input file archived") { _: StreamExecution =>
+            // it renames input file for third batch, but not fourth batch yet
+            assertFileIsMoved(dirForKeep3, expectedMovedDir3, "keep3")
+            assertFileIsNotMoved(dirForKeep3, expectedMovedDir3, "keep4")
+
+            true
+          }
+        )
+      }
+    }
+  }
+
+  class FakeFileSystem extends FileSystem {
+    val requestsExists = new mutable.MutableList[Path]()
+    val requestsMkdirs = new mutable.MutableList[Path]()
+    val requestsRename = new mutable.MutableList[(Path, Path)]()
+
+    override def exists(f: Path): Boolean = {
+      requestsExists += f
+      true
+    }
+
+    override def mkdirs(f: Path, permission: FsPermission): Boolean = {
+      requestsMkdirs += f
+      true
+    }
+
+    override def rename(src: Path, dst: Path): Boolean = {
+      requestsRename += ((src, dst))
+      true
+    }
+
+    def clearRecords(): Unit = {
+      requestsExists.clear()
+      requestsMkdirs.clear()
+      requestsRename.clear()
+    }
+
+    override def getUri: URI = throw new NotImplementedError
+
+    override def open(f: Path, bufferSize: Int): FSDataInputStream = throw new 
NotImplementedError
+
+    override def create(
+        f: Path,
+        permission: FsPermission,
+        overwrite: Boolean,
+        bufferSize: Int,
+        replication: Short,
+        blockSize: Long,
+        progress: Progressable): FSDataOutputStream = throw new 
NotImplementedError
+
+    override def append(f: Path, bufferSize: Int, progress: Progressable): 
FSDataOutputStream =
+      throw new NotImplementedError
+
+    override def delete(f: Path, recursive: Boolean): Boolean = throw new 
NotImplementedError
+
+    override def listStatus(f: Path): Array[FileStatus] = throw new 
NotImplementedError
+
+    override def setWorkingDirectory(new_dir: Path): Unit = throw new 
NotImplementedError
+
+    override def getWorkingDirectory: Path = throw new NotImplementedError
+
+    override def getFileStatus(f: Path): FileStatus = throw new 
NotImplementedError
+  }
+
+  test("FileStreamSourceCleaner - archive - destinations match against source 
pattern") {
+
 
 Review comment:
   remove empty line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to