HeartSaVioR commented on a change in pull request #31495:
URL: https://github.com/apache/spark/pull/31495#discussion_r571521148
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
##########
@@ -239,18 +239,35 @@ class HDFSMetadataLog[T <: AnyRef :
ClassTag](sparkSession: SparkSession, path:
.reverse
}
+ private var lastPurgedBatchId: Long = -1L
+
/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
override def purge(thresholdBatchId: Long): Unit = {
- val batchIds = fileManager.list(metadataPath, batchFilesFilter)
- .map(f => pathToBatchId(f.getPath))
-
- for (batchId <- batchIds if batchId < thresholdBatchId) {
- val path = batchIdToPath(batchId)
- fileManager.delete(path)
- logTrace(s"Removed metadata log file: $path")
+ val possibleTargetBatchIds = (lastPurgedBatchId + 1 until thresholdBatchId)
+ if (possibleTargetBatchIds.length <= 3) {
Review comment:
Yes usually the value will be 1, once purge is executed once via
listing. I just don't set it to 1 because I don't want to make the logic be
tight to MicroBatchExecution/ContinuousExecution. (Someone outside of Spark may
leverage this as well.)
That's a magic number based on heuristic so I don't mind too much about the
value (just should be greater than 1), but considering the cost of list vs
exist, the ideal threshold wouldn't be just 1.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]