gaborgsomogyi commented on a change in pull request #27620:
URL: https://github.com/apache/spark/pull/27620#discussion_r411948824
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
##########
@@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends
FileStreamSourceTest {
assert(expectedDir.exists())
assert(expectedDir.list().exists(_.startsWith(filePrefix)))
}
+
+ private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit):
Unit = {
+ val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+ val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+ try {
+ spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+ body
+ } finally {
+ originClassForLocalFileSystem match {
+ case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+ case _ => spark.conf.unset(optionKey)
+ }
+ }
+ }
+
+ test("Caches and leverages unread files") {
+ withCountListingLocalFileSystemAsLocalFileSystem {
+ withThreeTempDirs { case (src, meta, tmp) =>
+ val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" ->
"5")
+ val scheme = CountListingLocalFileSystem.scheme
+ val source = new FileStreamSource(spark,
s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+ StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+ val _metadataLog =
PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+ val metadataLog = source invokePrivate _metadataLog()
+
+ def verifyBatch(
+ offset: FileStreamSourceOffset,
+ expectedBatchId: Long,
+ inputFiles: Seq[File],
+ expectedListingCount: Int): Unit = {
+ val batchId = offset.logOffset
+ assert(batchId === expectedBatchId)
+
+ val files =
metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+ assert(files.forall(_.batchId == batchId))
+
+ val actualInputFiles = files.map { p => new
Path(p.path).toUri.getPath }
+ val expectedInputFiles = inputFiles.slice(batchId.toInt * 5,
batchId.toInt * 5 + 5)
+ .map(_.getCanonicalPath)
+ assert(actualInputFiles === expectedInputFiles)
+
+ assert(expectedListingCount ===
CountListingLocalFileSystem.pathToNumListStatusCalled
+ .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+ }
+
+ // provide 20 files in src, with sequential "last modified" to
guarantee ordering
+ var lastModified = 0
+ val inputFiles = (0 to 19).map { idx =>
+ val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+ f.setLastModified(lastModified)
+ lastModified += 10000
+ f
+ }
+
+ // 4 batches will be available for 20 input files
+ (0 to 3).foreach { batchId =>
+ val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L),
ReadLimit.maxFiles(5))
+ .asInstanceOf[FileStreamSourceOffset]
+ verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles,
expectedListingCount = 1)
+ }
+
+ val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L),
ReadLimit.maxFiles(5))
+ .asInstanceOf[FileStreamSourceOffset]
+ // latestOffset returns the offset for previous batch which means no
new batch is presented
+ assert(3 === offsetBatch.logOffset)
+ // listing should be performed after the list of unread files are
exhausted
+ assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
+ .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+ }
+ }
+ }
+
+ test("Don't cache unread files when latestFirst is true") {
+ withCountListingLocalFileSystemAsLocalFileSystem {
+ withThreeTempDirs { case (src, meta, tmp) =>
+ val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5")
+ val scheme = CountListingLocalFileSystem.scheme
+ val source = new FileStreamSource(spark,
s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+ StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+
+ // provide 20 files in src, with sequential "last modified" to
guarantee ordering
+ var lastModified = 0
+ (0 to 19).map { idx =>
+ val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+ f.setLastModified(lastModified)
+ lastModified += 10000
+ f
+ }
+
+ source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+ .asInstanceOf[FileStreamSourceOffset]
+ assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled
Review comment:
> This test only makes sure the calls of listing input directory (and
input files as well) are expected
Making sure that the modified code doesn't introduce further unintended
directory listing is also important but I agree not with the price to make test
failures when somebody makes modification in the stream source code. All in all
I agree not to add it since we've double checked that no further unintended
directory listing introduced.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]