Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3372#discussion_r20620004
  
    --- Diff: 
core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---
    @@ -40,41 +40,108 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     
       import JobProgressListener._
     
    +  // Define a handful of type aliases so that data structures' types can 
serve as documentation.
    +  // These type aliases are public because they're used in the types of 
public fields:
    +
       type JobId = Int
       type StageId = Int
       type StageAttemptId = Int
    +  type PoolName = String
    +  type ExecutorId = String
     
    -  // How many stages to remember
    -  val retainedStages = conf.getInt("spark.ui.retainedStages", 
DEFAULT_RETAINED_STAGES)
    -  // How many jobs to remember
    -  val retailedJobs = conf.getInt("spark.ui.retainedJobs", 
DEFAULT_RETAINED_JOBS)
    +  // Define all of our state:
     
    +  // Jobs:
       val activeJobs = new HashMap[JobId, JobUIData]
       val completedJobs = ListBuffer[JobUIData]()
       val failedJobs = ListBuffer[JobUIData]()
       val jobIdToData = new HashMap[JobId, JobUIData]
     
    +  // Stages:
       val activeStages = new HashMap[StageId, StageInfo]
       val completedStages = ListBuffer[StageInfo]()
       val failedStages = ListBuffer[StageInfo]()
       val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
       val stageIdToInfo = new HashMap[StageId, StageInfo]
    -  
    -  // Number of completed and failed stages, may not actually equal to 
completedStages.size and 
    -  // failedStages.size respectively due to completedStage and failedStages 
only maintain the latest
    -  // part of the stages, the earlier ones will be removed when there are 
too many stages for 
    -  // memory sake.
    +  val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
    +  // Total of completed and failed stages that have ever been run.  These 
may be greater than
    +  // `completedStages.size` and `failedStages.size` if we have run more 
stages or jobs than
    +  // JobProgressListener's retention limits.
       var numCompletedStages = 0
       var numFailedStages = 0
     
    -  // Map from pool name to a hash map (map from stage id to StageInfo).
    -  val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
    -
    -  val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
    +  // Misc:
    +  val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
    +  def blockManagerIds = executorIdToBlockManagerId.values.toSeq
     
       var schedulingMode: Option[SchedulingMode] = None
     
    -  def blockManagerIds = executorIdToBlockManagerId.values.toSeq
    +  // To limit the total memory usage of JobProgressListener, we only track 
information for a fixed
    +  // number of non-active jobs and stages (there is no limit for active 
jobs and stages):
    +
    +  val retainedStages = conf.getInt("spark.ui.retainedStages", 
DEFAULT_RETAINED_STAGES)
    +  val retainedJobs = conf.getInt("spark.ui.retainedJobs", 
DEFAULT_RETAINED_JOBS)
    +
    +  // We can test for memory leaks by ensuring that collections that track 
non-active jobs and
    +  // stages do not grow without bound and that collections for active 
jobs/stages eventually become
    +  // empty once Spark is idle.  Let's partition our collections into ones 
that should be empty
    +  // once Spark is idle and ones that should have a hard- or soft-limited 
sizes.
    +  // These methods are used by unit tests, but they're defined here so 
that people don't forget to
    +  // update the tests when adding new collections.  Some collections have 
multiple levels of
    +  // nesting, etc, so this lets us customize our notion of "size" for each 
structure:
    +
    +  // These collections should all be empty once Spark is idle (no active 
stages / jobs):
    +  private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, 
Int] = {
    +    Map(
    +      "activeStages" -> activeStages.size,
    +      "activeJobs" -> activeJobs.size,
    +      "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum
    +    )
    +  }
    +
    +  // These collections should stop growing once we have run at least 
`spark.ui.retainedStages`
    +  // stages and `spark.ui.retainedJobs` jobs:
    +  private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, 
Int] = {
    +    Map(
    +      "completedJobs" -> completedJobs.size,
    +      "failedJobs" -> failedJobs.size,
    +      "completedStages" -> completedStages.size,
    +      "failedStages" -> failedStages.size
    +    )
    +  }
    +  
    +  // These collections may grow arbitrarily, but once Spark becomes idle 
they should shrink back to
    +  // some bound based on the `spark.ui.retainedStages` and 
`spark.ui.retainedJobs` settings:
    +  private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, 
Int] = {
    +    Map(
    +      "jobIdToData" -> jobIdToData.size,
    +      "stageIdToData" -> stageIdToData.size,
    +      "stageIdToStageInfo" -> stageIdToInfo.size
    +    )
    +  }
    +
    +  /** If stages is too large, remove and garbage collect old stages */
    +  private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = 
synchronized {
    +    if (stages.size > retainedStages) {
    +      val toRemove = math.max(retainedStages / 10, 1)
    +      stages.take(toRemove).foreach { s =>
    +        stageIdToData.remove((s.stageId, s.attemptId))
    +        stageIdToInfo.remove(s.stageId)
    +      }
    +      stages.trimStart(toRemove)
    +    }
    +  }
    +
    +  /** If jobs is too large, remove and garbage collect old jobs */
    +  private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = 
synchronized {
    +    if (jobs.size > retainedJobs) {
    +      val toRemove = math.max(retainedJobs / 10, 1)
    --- End diff --
    
    I agree that this is a little puzzling (this was copied over from the old 
code).  It looks like the pattern here is essentially to create some 
size-limited collections with a FIFO eviction policy plus some callbacks when 
items are evicted.  A more bulletproof approach would be to create our own 
size-limited collection wrapper / subclass with these eviction callbacks, since 
this would prevent mistakes where someone adds an item to the collection but 
forgets to tall `trim*IfNecessary`.  I think we should do this as part of a 
separate commit, though, since I want to limit the scope of this change and 
want to get this in now to unblock a different patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to