gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS]
FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as
unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r408826170
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
##########
@@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends
FileStreamSourceTest {
assert(expectedDir.exists())
assert(expectedDir.list().exists(_.startsWith(filePrefix)))
}
+
+ private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit):
Unit = {
+ val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+ val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+ try {
+ spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+ body
+ } finally {
+ originClassForLocalFileSystem match {
+ case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+ case _ => spark.conf.unset(optionKey)
+ }
+ }
+ }
+
+ test("Caches and leverages unread files") {
+ withCountListingLocalFileSystemAsLocalFileSystem {
+ withThreeTempDirs { case (src, meta, tmp) =>
+ val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" ->
"5")
+ val scheme = CountListingLocalFileSystem.scheme
+ val source = new FileStreamSource(spark,
s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+ StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+ val _metadataLog =
PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+ val metadataLog = source invokePrivate _metadataLog()
+
+ def verifyBatch(
+ offset: FileStreamSourceOffset,
+ expectedBatchId: Long,
+ inputFiles: Seq[File],
+ expectedListingCount: Int): Unit = {
+ val batchId = offset.logOffset
+ assert(batchId === expectedBatchId)
+
+ val files =
metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+ assert(files.forall(_.batchId == batchId))
+
+ val actualInputFiles = files.map { p => new
Path(p.path).toUri.getPath }
+ val expectedInputFiles = inputFiles.slice(batchId.toInt * 5,
batchId.toInt * 5 + 5)
+ .map(_.getCanonicalPath)
+ assert(actualInputFiles === expectedInputFiles)
+
+ assert(expectedListingCount ===
CountListingLocalFileSystem.pathToNumListStatusCalled
+ .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+ }
+
+ // provide 20 files in src, with sequential "last modified" to
guarantee ordering
+ var lastModified = 0
+ val inputFiles = (0 to 19).map { idx =>
+ val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+ f.setLastModified(lastModified)
Review comment:
Maybe `idx * 10000`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]