srowen commented on a change in pull request #25072: [SPARK-28294][CORE]
Support `spark.history.fs.cleaner.maxNum` configuration
URL: https://github.com/apache/spark/pull/25072#discussion_r301227170
##########
File path:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -851,10 +836,62 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
listing.delete(classOf[LogInfo], log.logPath)
}
}
+
+ // If the number of files is bigger than MAX_LOG_NUM,
+ // clean up all completed attempts per application one by one.
+ val num =
listing.view(classOf[LogInfo]).index("lastProcessed").asScala.count(_ => true)
+ var count = num - maxNum
+ if (count > 0) {
+ logInfo(s"Try to delete $count old event logs to keep $maxNum logs in
total.")
+ val oldAttempts = listing.view(classOf[ApplicationInfoWrapper])
+ .index("oldestAttempt")
+ .asScala
+ .toList
+ oldAttempts.foreach { app =>
+ if (count > 0) {
+ // Applications may have multiple attempts, some of which may not be
completed yet.
+ val (toDelete, remaining) = app.attempts.partition { attempt =>
Review comment:
`.partition(_.info.completed)`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]