Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149778794
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---
    @@ -89,398 +83,3 @@ private class LongLongTupleConverter extends 
Converter[(Object, Object), (Long,
         typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], 
Array(longType, longType))
       }
     }
    -
    -class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
    -
    -  override def createListeners(conf: SparkConf, sparkUI: SparkUI): 
Seq[SparkListener] = {
    -    List(new SQLHistoryListener(conf, sparkUI))
    -  }
    -}
    -
    -class SQLListener(conf: SparkConf) extends SparkListener with Logging {
    -
    -  private val retainedExecutions = 
conf.getInt("spark.sql.ui.retainedExecutions", 1000)
    -
    -  private val activeExecutions = mutable.HashMap[Long, 
SQLExecutionUIData]()
    -
    -  // Old data in the following fields must be removed in 
"trimExecutionsIfNecessary".
    -  // If adding new fields, make sure "trimExecutionsIfNecessary" can clean 
up old data
    -  private val _executionIdToData = mutable.HashMap[Long, 
SQLExecutionUIData]()
    -
    -  /**
    -   * Maintain the relation between job id and execution id so that we can 
get the execution id in
    -   * the "onJobEnd" method.
    -   */
    -  private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
    -
    -  private val _stageIdToStageMetrics = mutable.HashMap[Long, 
SQLStageMetrics]()
    -
    -  private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
    -
    -  private val completedExecutions = 
mutable.ListBuffer[SQLExecutionUIData]()
    -
    -  def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
    -    _executionIdToData.toMap
    -  }
    -
    -  def jobIdToExecutionId: Map[Long, Long] = synchronized {
    -    _jobIdToExecutionId.toMap
    -  }
    -
    -  def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
    -    _stageIdToStageMetrics.toMap
    -  }
    -
    -  private def trimExecutionsIfNecessary(
    -      executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = {
    -    if (executions.size > retainedExecutions) {
    -      val toRemove = math.max(retainedExecutions / 10, 1)
    -      executions.take(toRemove).foreach { execution =>
    -        for (executionUIData <- 
_executionIdToData.remove(execution.executionId)) {
    -          for (jobId <- executionUIData.jobs.keys) {
    -            _jobIdToExecutionId.remove(jobId)
    -          }
    -          for (stageId <- executionUIData.stages) {
    -            _stageIdToStageMetrics.remove(stageId)
    -          }
    -        }
    -      }
    -      executions.trimStart(toRemove)
    -    }
    -  }
    -
    -  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    -    val executionIdString = 
jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    -    if (executionIdString == null) {
    -      // This is not a job created by SQL
    -      return
    -    }
    -    val executionId = executionIdString.toLong
    -    val jobId = jobStart.jobId
    -    val stageIds = jobStart.stageIds
    -
    -    synchronized {
    -      activeExecutions.get(executionId).foreach { executionUIData =>
    -        executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING
    -        executionUIData.stages ++= stageIds
    -        stageIds.foreach(stageId =>
    -          _stageIdToStageMetrics(stageId) = new 
SQLStageMetrics(stageAttemptId = 0))
    -        _jobIdToExecutionId(jobId) = executionId
    -      }
    -    }
    -  }
    -
    -  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
    -    val jobId = jobEnd.jobId
    -    for (executionId <- _jobIdToExecutionId.get(jobId);
    -         executionUIData <- _executionIdToData.get(executionId)) {
    -      jobEnd.jobResult match {
    -        case JobSucceeded => executionUIData.jobs(jobId) = 
JobExecutionStatus.SUCCEEDED
    -        case JobFailed(_) => executionUIData.jobs(jobId) = 
JobExecutionStatus.FAILED
    -      }
    -      if (executionUIData.completionTime.nonEmpty && 
!executionUIData.hasRunningJobs) {
    -        // We are the last job of this execution, so mark the execution as 
finished. Note that
    -        // `onExecutionEnd` also does this, but currently that can be 
called before `onJobEnd`
    -        // since these are called on different threads.
    -        markExecutionFinished(executionId)
    -      }
    -    }
    -  }
    -
    -  override def onExecutorMetricsUpdate(
    -      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = 
synchronized {
    -    for ((taskId, stageId, stageAttemptID, accumUpdates) <- 
executorMetricsUpdate.accumUpdates) {
    -      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, 
accumUpdates, finishTask = false)
    -    }
    -  }
    -
    -  override def onStageSubmitted(stageSubmitted: 
SparkListenerStageSubmitted): Unit = synchronized {
    -    val stageId = stageSubmitted.stageInfo.stageId
    -    val stageAttemptId = stageSubmitted.stageInfo.attemptId
    -    // Always override metrics for old stage attempt
    -    if (_stageIdToStageMetrics.contains(stageId)) {
    -      _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
    -    } else {
    -      // If a stage belongs to some SQL execution, its stageId will be put 
in "onJobStart".
    -      // Since "_stageIdToStageMetrics" doesn't contain it, it must not 
belong to any SQL execution.
    -      // So we can ignore it. Otherwise, this may lead to memory leaks 
(SPARK-11126).
    -    }
    -  }
    -
    -  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = 
synchronized {
    -    if (taskEnd.taskMetrics != null) {
    -      updateTaskAccumulatorValues(
    -        taskEnd.taskInfo.taskId,
    -        taskEnd.stageId,
    -        taskEnd.stageAttemptId,
    -        taskEnd.taskMetrics.externalAccums.map(a => 
a.toInfo(Some(a.value), None)),
    -        finishTask = true)
    -    }
    -  }
    -
    -  /**
    -   * Update the accumulator values of a task with the latest metrics for 
this task. This is called
    -   * every time we receive an executor heartbeat or when a task finishes.
    -   */
    -  protected def updateTaskAccumulatorValues(
    -      taskId: Long,
    -      stageId: Int,
    -      stageAttemptID: Int,
    -      _accumulatorUpdates: Seq[AccumulableInfo],
    -      finishTask: Boolean): Unit = {
    -    val accumulatorUpdates =
    -      _accumulatorUpdates.filter(_.update.isDefined).map(accum => 
(accum.id, accum.update.get))
    -
    -    _stageIdToStageMetrics.get(stageId) match {
    -      case Some(stageMetrics) =>
    -        if (stageAttemptID < stageMetrics.stageAttemptId) {
    -          // A task of an old stage attempt. Because a new stage is 
submitted, we can ignore it.
    -        } else if (stageAttemptID > stageMetrics.stageAttemptId) {
    -          logWarning(s"A task should not have a higher stageAttemptID 
($stageAttemptID) then " +
    -            s"what we have seen (${stageMetrics.stageAttemptId})")
    -        } else {
    -          // TODO We don't know the attemptId. Currently, what we can do 
is overriding the
    --- End diff --
    
    this comment was hard to make sense of (you shouldn't ever have two tasks 
with the same taskId, even with speculation), but I think there is something 
here which may still be worth mentioning.  You aggregate metrics across all 
attempts for a given task (aka "index"), even speculative ones (before and 
after your change) -- I'd mention that in a comment.
    
    (The index is available in onTaskStart / End if we wanted to de-duplicate.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to