Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19751#discussion_r156809149
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
    @@ -772,4 +813,118 @@ private[spark] class AppStatusListener(
         }
       }
     
    +  private def cleanupExecutors(count: Long): Unit = {
    +    // Because the limit is on the number of *dead* executors, we need to 
calculate whether
    +    // there are actually enough dead executors to be deleted.
    +    val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
    +    val dead = count - activeExecutorCount
    +
    +    if (dead > threshold) {
    +      val countToDelete = calculateNumberToRemove(dead, threshold)
    +      val toDelete = 
kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
    +        .max(countToDelete).first(false).last(false).asScala.toSeq
    +      toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
    +    }
    +  }
    +
    +  private def cleanupJobs(count: Long): Unit = {
    +    val countToDelete = calculateNumberToRemove(count, 
conf.get(MAX_RETAINED_JOBS))
    +    if (countToDelete <= 0L) {
    +      return
    +    }
    +
    +    val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
    +        countToDelete.toInt) { j =>
    +      j.info.status != JobExecutionStatus.RUNNING && j.info.status != 
JobExecutionStatus.UNKNOWN
    +    }
    +    toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
    +  }
    +
    +  private def cleanupStages(count: Long): Unit = {
    +    val countToDelete = calculateNumberToRemove(count, 
conf.get(MAX_RETAINED_STAGES))
    +    if (countToDelete <= 0L) {
    +      return
    +    }
    +
    +    val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
    +        countToDelete.toInt) { s =>
    +      s.info.status != v1.StageStatus.ACTIVE && s.info.status != 
v1.StageStatus.PENDING
    +    }
    +
    +    stages.foreach { s =>
    +      val key = s.id
    +      kvstore.delete(s.getClass(), key)
    +
    +      val execSummaries = 
kvstore.view(classOf[ExecutorStageSummaryWrapper])
    +        .index("stage")
    +        .first(key)
    +        .last(key)
    +        .asScala
    +        .toSeq
    +      execSummaries.foreach { e =>
    +        kvstore.delete(e.getClass(), e.id)
    +      }
    +
    +      val tasks = kvstore.view(classOf[TaskDataWrapper])
    +        .index("stage")
    +        .first(key)
    +        .last(key)
    +        .asScala
    +
    +      tasks.foreach { t =>
    +        kvstore.delete(t.getClass(), t.info.taskId)
    +      }
    +
    +      // Check whether there are remaining attempts for the same stage. If 
there aren't, then
    +      // also delete the RDD graph data.
    +      val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
    +        .index("stageId")
    +        .first(s.stageId)
    +        .last(s.stageId)
    +        .closeableIterator()
    +
    +      val hasMoreAttempts = try {
    +        remainingAttempts.asScala.exists { other =>
    +          other.info.attemptId != s.info.attemptId
    +        }
    +      } finally {
    +        remainingAttempts.close()
    +      }
    +
    +      if (!hasMoreAttempts) {
    +        kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
    +      }
    +    }
    +  }
    +
    +  private def cleanupTasks(stage: LiveStage): Unit = {
    +    val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), 
maxTasksPerStage)
    +    if (countToDelete > 0L) {
    +      val stageKey = Array(stage.info.stageId, stage.info.attemptId)
    +      val view = 
kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
    +        .last(stageKey)
    +
    +      // On live applications, try to delete finished tasks only; when in 
the SHS, treat all
    +      // tasks as the same.
    +      val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t =>
    +        !live || t.info.status != TaskState.RUNNING.toString()
    --- End diff --
    
    The old code deletes tasks in the order they arrive; it would be expensive 
to do that here since it would involve sorting the task list (cheap for disk 
store, expensive for in-memory).
    
    I can keep the same filter behavior for both.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to