jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011238096
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef :
ClassTag](sparkSession: SparkSession, path:
for (batchId <- batchIds if batchId > thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
+ if (metadataCacheEnabled) batchCache.remove(batchId)
logTrace(s"Removed metadata log file: $path")
}
}
+
+ /**
+ * List the available batches on file system. As a workaround for S3
inconsistent list, it also
+ * tries to take `batchCache` into consideration to infer a better answer.
+ */
+ protected def listBatches: Array[Long] = {
+ val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+ .map(f => pathToBatchId(f.getPath)) ++
+ // Iterate over keySet is not thread safe. We call `toArray` to make a
copy in the lock to
+ // elimiate the race condition.
+ batchCache.synchronized {
+ batchCache.keySet.asScala.toArray
+ }
+ logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", "))
+
+ if (batchIds.isEmpty) {
+ return Array.empty
Review Comment:
will fix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]