This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e702b32656bc [SPARK-48314][SS] Don't double cache files for FileStreamSource using Trigger.AvailableNow e702b32656bc is described below commit e702b32656bcbe194be19876990954a4be457734 Author: Adam Binford <adam...@gmail.com> AuthorDate: Wed May 22 10:58:02 2024 +0900 [SPARK-48314][SS] Don't double cache files for FileStreamSource using Trigger.AvailableNow ### What changes were proposed in this pull request? Files don't need to be cached for reuse in `FileStreamSource` when using `Trigger.AvailableNow` because all files are already cached for the lifetime of the query in `allFilesForTriggerAvailableNow`. ### Why are the changes needed? As reported in https://issues.apache.org/jira/browse/SPARK-44924 (with a PR to address https://github.com/apache/spark/pull/45362), the hard coded cap of 10k files being cached can cause problems when using a maxFilesPerTrigger > 10k. It causes every other batch to be 10k files, which can greatly limit the throughput of a new streaming trying to catch up. ### Does this PR introduce _any_ user-facing change? Every other streaming batch won't be 10k files if using Trigger.AvailableNow and maxFilesPerTrigger greater than 10k. ### How was this patch tested? New UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #46627 from Kimahriman/available-now-no-cache. Authored-by: Adam Binford <adam...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/FileStreamSource.scala | 10 +++-- .../sql/streaming/FileStreamSourceSuite.scala | 45 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 373a122e0001..4a9b2d11b7e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -184,9 +184,11 @@ class FileStreamSource( } } + val shouldCache = !sourceOptions.latestFirst && allFilesForTriggerAvailableNow == null + // Obey user's setting to limit the number of files in this batch trigger. val (batchFiles, unselectedFiles) = limit match { - case files: ReadMaxFiles if !sourceOptions.latestFirst => + case files: ReadMaxFiles if shouldCache => // we can cache and reuse remaining fetched list of files in further batches val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) if (usFiles.size < files.maxFiles() * discardCachedInputRatio) { @@ -200,10 +202,10 @@ class FileStreamSource( } case files: ReadMaxFiles => - // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch + // don't use the cache, just take files for the next batch (newFiles.take(files.maxFiles()), null) - case files: ReadMaxBytes if !sourceOptions.latestFirst => + case files: ReadMaxBytes if shouldCache => // we can cache and reuse remaining fetched list of files in further batches val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) = takeFilesUntilMax(newFiles, files.maxBytes()) @@ -218,8 +220,8 @@ class FileStreamSource( } case files: ReadMaxBytes => + // don't use the cache, just take files for the next batch val (FilesSplit(bFiles, _), _) = takeFilesUntilMax(newFiles, files.maxBytes()) - // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch (bFiles, null) case _: ReadAllAvailable => (newFiles, null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index ff3cc5c247df..ca4f2a7f26ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2448,6 +2448,51 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-48314: Don't cache unread files when using Trigger.AvailableNow") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5", + "maxCachedFiles" -> "2") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) + val metadataLog = source invokePrivate _metadataLog() + + // provide 20 files in src, with sequential "last modified" to guarantee ordering + (0 to 19).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(idx * 10000) + f + } + + source.prepareForTriggerAvailableNow() + CountListingLocalFileSystem.resetCount() + + var offset = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + var files = metadataLog.get(offset.logOffset).getOrElse(Array.empty[FileEntry]) + + // All files are already tracked in allFilesForTriggerAvailableNow + assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + // Should be 5 files in the batch based on maxFiles limit + assert(files.length == 5) + + // Reading again leverages the files already tracked in allFilesForTriggerAvailableNow, + // so no more listings need to happen + offset = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + files = metadataLog.get(offset.logOffset).getOrElse(Array.empty[FileEntry]) + + assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + // Should be 5 files in the batch since cached files are ignored + assert(files.length == 5) + } + } + } + test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") { def formatTime(time: LocalDateTime): String = { time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org