HeartSaVioR commented on a change in pull request #31495:
URL: https://github.com/apache/spark/pull/31495#discussion_r571521148



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
##########
@@ -239,18 +239,35 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
       .reverse
   }
 
+  private var lastPurgedBatchId: Long = -1L
+
   /**
    * Removes all the log entry earlier than thresholdBatchId (exclusive).
    */
   override def purge(thresholdBatchId: Long): Unit = {
-    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
-      .map(f => pathToBatchId(f.getPath))
-
-    for (batchId <- batchIds if batchId < thresholdBatchId) {
-      val path = batchIdToPath(batchId)
-      fileManager.delete(path)
-      logTrace(s"Removed metadata log file: $path")
+    val possibleTargetBatchIds = (lastPurgedBatchId + 1 until thresholdBatchId)
+    if (possibleTargetBatchIds.length <= 3) {

Review comment:
       Yes usually the value will be 1, once purge is executed once via 
listing. I just don't set it to 1 because I don't want to make the logic be 
tight to MicroBatchExecution/ContinuousExecution. (Someone outside of Spark may 
leverage this as well.)
   
   That's a magic number based on heuristic so I don't mind too much about the 
value (just should be greater than 1), but considering the cost of list vs 
exist, the ideal threshold wouldn't be just 1.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to