Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149778794 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala --- @@ -89,398 +83,3 @@ private class LongLongTupleConverter extends Converter[(Object, Object), (Long, typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType)) } } - -class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { - - override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { - List(new SQLHistoryListener(conf, sparkUI)) - } -} - -class SQLListener(conf: SparkConf) extends SparkListener with Logging { - - private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000) - - private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]() - - // Old data in the following fields must be removed in "trimExecutionsIfNecessary". - // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up old data - private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]() - - /** - * Maintain the relation between job id and execution id so that we can get the execution id in - * the "onJobEnd" method. - */ - private val _jobIdToExecutionId = mutable.HashMap[Long, Long]() - - private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]() - - private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized { - _executionIdToData.toMap - } - - def jobIdToExecutionId: Map[Long, Long] = synchronized { - _jobIdToExecutionId.toMap - } - - def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized { - _stageIdToStageMetrics.toMap - } - - private def trimExecutionsIfNecessary( - executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = { - if (executions.size > retainedExecutions) { - val toRemove = math.max(retainedExecutions / 10, 1) - executions.take(toRemove).foreach { execution => - for (executionUIData <- _executionIdToData.remove(execution.executionId)) { - for (jobId <- executionUIData.jobs.keys) { - _jobIdToExecutionId.remove(jobId) - } - for (stageId <- executionUIData.stages) { - _stageIdToStageMetrics.remove(stageId) - } - } - } - executions.trimStart(toRemove) - } - } - - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) - if (executionIdString == null) { - // This is not a job created by SQL - return - } - val executionId = executionIdString.toLong - val jobId = jobStart.jobId - val stageIds = jobStart.stageIds - - synchronized { - activeExecutions.get(executionId).foreach { executionUIData => - executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING - executionUIData.stages ++= stageIds - stageIds.foreach(stageId => - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId = 0)) - _jobIdToExecutionId(jobId) = executionId - } - } - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { - val jobId = jobEnd.jobId - for (executionId <- _jobIdToExecutionId.get(jobId); - executionUIData <- _executionIdToData.get(executionId)) { - jobEnd.jobResult match { - case JobSucceeded => executionUIData.jobs(jobId) = JobExecutionStatus.SUCCEEDED - case JobFailed(_) => executionUIData.jobs(jobId) = JobExecutionStatus.FAILED - } - if (executionUIData.completionTime.nonEmpty && !executionUIData.hasRunningJobs) { - // We are the last job of this execution, so mark the execution as finished. Note that - // `onExecutionEnd` also does this, but currently that can be called before `onJobEnd` - // since these are called on different threads. - markExecutionFinished(executionId) - } - } - } - - override def onExecutorMetricsUpdate( - executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized { - for ((taskId, stageId, stageAttemptID, accumUpdates) <- executorMetricsUpdate.accumUpdates) { - updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, accumUpdates, finishTask = false) - } - } - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { - val stageId = stageSubmitted.stageInfo.stageId - val stageAttemptId = stageSubmitted.stageInfo.attemptId - // Always override metrics for old stage attempt - if (_stageIdToStageMetrics.contains(stageId)) { - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) - } else { - // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart". - // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution. - // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126). - } - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { - if (taskEnd.taskMetrics != null) { - updateTaskAccumulatorValues( - taskEnd.taskInfo.taskId, - taskEnd.stageId, - taskEnd.stageAttemptId, - taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)), - finishTask = true) - } - } - - /** - * Update the accumulator values of a task with the latest metrics for this task. This is called - * every time we receive an executor heartbeat or when a task finishes. - */ - protected def updateTaskAccumulatorValues( - taskId: Long, - stageId: Int, - stageAttemptID: Int, - _accumulatorUpdates: Seq[AccumulableInfo], - finishTask: Boolean): Unit = { - val accumulatorUpdates = - _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get)) - - _stageIdToStageMetrics.get(stageId) match { - case Some(stageMetrics) => - if (stageAttemptID < stageMetrics.stageAttemptId) { - // A task of an old stage attempt. Because a new stage is submitted, we can ignore it. - } else if (stageAttemptID > stageMetrics.stageAttemptId) { - logWarning(s"A task should not have a higher stageAttemptID ($stageAttemptID) then " + - s"what we have seen (${stageMetrics.stageAttemptId})") - } else { - // TODO We don't know the attemptId. Currently, what we can do is overriding the --- End diff -- this comment was hard to make sense of (you shouldn't ever have two tasks with the same taskId, even with speculation), but I think there is something here which may still be worth mentioning. You aggregate metrics across all attempts for a given task (aka "index"), even speculative ones (before and after your change) -- I'd mention that in a comment. (The index is available in onTaskStart / End if we wanted to de-duplicate.)
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org