gaborgsomogyi commented on a change in pull request #27649: [SPARK-30900][SS] 
FileStreamSource: Avoid reading compact metadata log twice if the query 
restarts from compact batch
URL: https://github.com/apache/spark/pull/27649#discussion_r399122439
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1371,6 +1371,60 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
     }
   }
 
+  test("restore from file stream source log") {
+    def createEntries(batchId: Long, count: Int): Array[FileEntry] = {
+      (1 to count).map { idx =>
+        FileEntry(s"path_${batchId}_$idx", 10000 * batchId + count, batchId)
+      }.toArray
+    }
+
+    withSQLConf(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") {
+      withTempDir { chk =>
+        val _fileEntryCache = PrivateMethod[java.util.LinkedHashMap[Long, 
Array[FileEntry]]](
+          Symbol("fileEntryCache"))
+
+        val metadata = new FileStreamSourceLog(FileStreamSourceLog.VERSION, 
spark,
+          chk.getCanonicalPath)
+        val fileEntryCache = metadata invokePrivate _fileEntryCache()
+
+        (0 to 4).foreach { batchId =>
+          metadata.add(batchId, createEntries(batchId, 100))
+        }
+        val allFiles = metadata.allFiles()
+
+        // batch 4 is a compact batch which logs would be cached in 
fileEntryCache
+        assert(fileEntryCache.containsKey(4L))
+
+        val metadata2 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, 
spark,
+          chk.getCanonicalPath)
+        val fileEntryCache2 = metadata2 invokePrivate _fileEntryCache()
+
+        // allFiles() doesn't restore the logs for the latest compact batch 
into file entry cache
+        assert(metadata2.allFiles() === allFiles)
+        assert(!fileEntryCache2.containsKey(4L))
+
+        // restore() will restore the logs for the latest compact batch into 
file entry cache
+        assert(metadata2.restore() === allFiles)
+        assert(fileEntryCache2.containsKey(4L))
+
+        (5 to 5 + 
FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId =>
+          metadata2.add(batchId, createEntries(batchId, 100))
+        }
+        val allFiles2 = metadata2.allFiles()
 
 Review comment:
   As I've seen this val only used in one place.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to