Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19751#discussion_r156809149
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -772,4 +813,118 @@ private[spark] class AppStatusListener(
}
}
+ private def cleanupExecutors(count: Long): Unit = {
+ // Because the limit is on the number of *dead* executors, we need to
calculate whether
+ // there are actually enough dead executors to be deleted.
+ val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
+ val dead = count - activeExecutorCount
+
+ if (dead > threshold) {
+ val countToDelete = calculateNumberToRemove(dead, threshold)
+ val toDelete =
kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
+ .max(countToDelete).first(false).last(false).asScala.toSeq
+ toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
+ }
+ }
+
+ private def cleanupJobs(count: Long): Unit = {
+ val countToDelete = calculateNumberToRemove(count,
conf.get(MAX_RETAINED_JOBS))
+ if (countToDelete <= 0L) {
+ return
+ }
+
+ val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
+ countToDelete.toInt) { j =>
+ j.info.status != JobExecutionStatus.RUNNING && j.info.status !=
JobExecutionStatus.UNKNOWN
+ }
+ toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
+ }
+
+ private def cleanupStages(count: Long): Unit = {
+ val countToDelete = calculateNumberToRemove(count,
conf.get(MAX_RETAINED_STAGES))
+ if (countToDelete <= 0L) {
+ return
+ }
+
+ val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
+ countToDelete.toInt) { s =>
+ s.info.status != v1.StageStatus.ACTIVE && s.info.status !=
v1.StageStatus.PENDING
+ }
+
+ stages.foreach { s =>
+ val key = s.id
+ kvstore.delete(s.getClass(), key)
+
+ val execSummaries =
kvstore.view(classOf[ExecutorStageSummaryWrapper])
+ .index("stage")
+ .first(key)
+ .last(key)
+ .asScala
+ .toSeq
+ execSummaries.foreach { e =>
+ kvstore.delete(e.getClass(), e.id)
+ }
+
+ val tasks = kvstore.view(classOf[TaskDataWrapper])
+ .index("stage")
+ .first(key)
+ .last(key)
+ .asScala
+
+ tasks.foreach { t =>
+ kvstore.delete(t.getClass(), t.info.taskId)
+ }
+
+ // Check whether there are remaining attempts for the same stage. If
there aren't, then
+ // also delete the RDD graph data.
+ val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
+ .index("stageId")
+ .first(s.stageId)
+ .last(s.stageId)
+ .closeableIterator()
+
+ val hasMoreAttempts = try {
+ remainingAttempts.asScala.exists { other =>
+ other.info.attemptId != s.info.attemptId
+ }
+ } finally {
+ remainingAttempts.close()
+ }
+
+ if (!hasMoreAttempts) {
+ kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
+ }
+ }
+ }
+
+ private def cleanupTasks(stage: LiveStage): Unit = {
+ val countToDelete = calculateNumberToRemove(stage.savedTasks.get(),
maxTasksPerStage)
+ if (countToDelete > 0L) {
+ val stageKey = Array(stage.info.stageId, stage.info.attemptId)
+ val view =
kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
+ .last(stageKey)
+
+ // On live applications, try to delete finished tasks only; when in
the SHS, treat all
+ // tasks as the same.
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t =>
+ !live || t.info.status != TaskState.RUNNING.toString()
--- End diff --
The old code deletes tasks in the order they arrive; it would be expensive
to do that here since it would involve sorting the task list (cheap for disk
store, expensive for in-memory).
I can keep the same filter behavior for both.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]