HeartSaVioR commented on a change in pull request #27649: [SPARK-30900][SS] FileStreamSource: Avoid reading compact metadata log twice if the query restarts from compact batch URL: https://github.com/apache/spark/pull/27649#discussion_r398358987
########## File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ########## @@ -1371,6 +1371,60 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("restore from file stream source log") { + def createEntries(batchId: Long, count: Int): Array[FileEntry] = { + (1 to count).map { idx => + FileEntry(s"path_${batchId}_$idx", 10000 * batchId + count, batchId) + }.toArray + } + + withSQLConf(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") { + withTempDir { chk => + val _fileEntryCache = PrivateMethod[java.util.LinkedHashMap[Long, Array[FileEntry]]]( + Symbol("fileEntryCache")) + + val metadata = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, + chk.getCanonicalPath) + val fileEntryCache = metadata invokePrivate _fileEntryCache() + + (0 to 4).foreach { batchId => + metadata.add(batchId, createEntries(batchId, 100)) + } + val allFiles = metadata.allFiles() + + // batch 4 is a compact batch which logs would be cached in fileEntryCache + fileEntryCache.containsKey(4) Review comment: lol I have no idea why I missed assert here. Nice finding. Will add. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org