gaborgsomogyi commented on a change in pull request #27649: [SPARK-30900][SS]
FileStreamSource: Avoid reading compact metadata log twice if the query
restarts from compact batch
URL: https://github.com/apache/spark/pull/27649#discussion_r398576996
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
##########
@@ -1371,6 +1371,60 @@ class FileStreamSourceSuite extends
FileStreamSourceTest {
}
}
+ test("restore from file stream source log") {
+ def createEntries(batchId: Long, count: Int): Array[FileEntry] = {
+ (1 to count).map { idx =>
+ FileEntry(s"path_${batchId}_$idx", 10000 * batchId + count, batchId)
+ }.toArray
+ }
+
+ withSQLConf(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") {
+ withTempDir { chk =>
+ val _fileEntryCache = PrivateMethod[java.util.LinkedHashMap[Long,
Array[FileEntry]]](
+ Symbol("fileEntryCache"))
+
+ val metadata = new FileStreamSourceLog(FileStreamSourceLog.VERSION,
spark,
+ chk.getCanonicalPath)
+ val fileEntryCache = metadata invokePrivate _fileEntryCache()
+
+ (0 to 4).foreach { batchId =>
+ metadata.add(batchId, createEntries(batchId, 100))
+ }
+ val allFiles = metadata.allFiles()
+
+ // batch 4 is a compact batch which logs would be cached in
fileEntryCache
+ assert(fileEntryCache.containsKey(4L))
+
+ val metadata2 = new FileStreamSourceLog(FileStreamSourceLog.VERSION,
spark,
+ chk.getCanonicalPath)
+ val fileEntryCache2 = metadata2 invokePrivate _fileEntryCache()
+
+ // allFiles() doesn't restore the logs for the latest compact batch
into file entry cache
+ assert(metadata2.allFiles() === allFiles)
+ assert(!fileEntryCache2.containsKey(4L))
+
+ // restore() will restore the logs for the latest compact batch into
file entry cache
+ assert(metadata2.restore() === allFiles)
+ assert(fileEntryCache2.containsKey(4L))
+
+ (5 to 5 +
FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId =>
+ metadata2.add(batchId, createEntries(batchId, 100))
+ }
+ val allFiles2 = metadata2.allFiles()
+
+ val metadata3 = new FileStreamSourceLog(FileStreamSourceLog.VERSION,
spark,
+ chk.getCanonicalPath)
+ val fileEntryCache3 = metadata3 invokePrivate _fileEntryCache()
+
+ // restore() will not restore the logs for the latest compact batch
into file entry cache
+ // if the latest batch is too far from latest compact batch, because
it's unlikely Spark
+ // will request the batch for the start point.
+ assert(metadata2.restore() === allFiles2)
+ assert(!fileEntryCache3.containsKey(4L))
Review comment:
Similar here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]