Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19681#discussion_r149778794
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---
@@ -89,398 +83,3 @@ private class LongLongTupleConverter extends
Converter[(Object, Object), (Long,
typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)],
Array(longType, longType))
}
}
-
-class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
-
- override def createListeners(conf: SparkConf, sparkUI: SparkUI):
Seq[SparkListener] = {
- List(new SQLHistoryListener(conf, sparkUI))
- }
-}
-
-class SQLListener(conf: SparkConf) extends SparkListener with Logging {
-
- private val retainedExecutions =
conf.getInt("spark.sql.ui.retainedExecutions", 1000)
-
- private val activeExecutions = mutable.HashMap[Long,
SQLExecutionUIData]()
-
- // Old data in the following fields must be removed in
"trimExecutionsIfNecessary".
- // If adding new fields, make sure "trimExecutionsIfNecessary" can clean
up old data
- private val _executionIdToData = mutable.HashMap[Long,
SQLExecutionUIData]()
-
- /**
- * Maintain the relation between job id and execution id so that we can
get the execution id in
- * the "onJobEnd" method.
- */
- private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
-
- private val _stageIdToStageMetrics = mutable.HashMap[Long,
SQLStageMetrics]()
-
- private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
-
- private val completedExecutions =
mutable.ListBuffer[SQLExecutionUIData]()
-
- def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
- _executionIdToData.toMap
- }
-
- def jobIdToExecutionId: Map[Long, Long] = synchronized {
- _jobIdToExecutionId.toMap
- }
-
- def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
- _stageIdToStageMetrics.toMap
- }
-
- private def trimExecutionsIfNecessary(
- executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = {
- if (executions.size > retainedExecutions) {
- val toRemove = math.max(retainedExecutions / 10, 1)
- executions.take(toRemove).foreach { execution =>
- for (executionUIData <-
_executionIdToData.remove(execution.executionId)) {
- for (jobId <- executionUIData.jobs.keys) {
- _jobIdToExecutionId.remove(jobId)
- }
- for (stageId <- executionUIData.stages) {
- _stageIdToStageMetrics.remove(stageId)
- }
- }
- }
- executions.trimStart(toRemove)
- }
- }
-
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
- val executionIdString =
jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
- if (executionIdString == null) {
- // This is not a job created by SQL
- return
- }
- val executionId = executionIdString.toLong
- val jobId = jobStart.jobId
- val stageIds = jobStart.stageIds
-
- synchronized {
- activeExecutions.get(executionId).foreach { executionUIData =>
- executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING
- executionUIData.stages ++= stageIds
- stageIds.foreach(stageId =>
- _stageIdToStageMetrics(stageId) = new
SQLStageMetrics(stageAttemptId = 0))
- _jobIdToExecutionId(jobId) = executionId
- }
- }
- }
-
- override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
- val jobId = jobEnd.jobId
- for (executionId <- _jobIdToExecutionId.get(jobId);
- executionUIData <- _executionIdToData.get(executionId)) {
- jobEnd.jobResult match {
- case JobSucceeded => executionUIData.jobs(jobId) =
JobExecutionStatus.SUCCEEDED
- case JobFailed(_) => executionUIData.jobs(jobId) =
JobExecutionStatus.FAILED
- }
- if (executionUIData.completionTime.nonEmpty &&
!executionUIData.hasRunningJobs) {
- // We are the last job of this execution, so mark the execution as
finished. Note that
- // `onExecutionEnd` also does this, but currently that can be
called before `onJobEnd`
- // since these are called on different threads.
- markExecutionFinished(executionId)
- }
- }
- }
-
- override def onExecutorMetricsUpdate(
- executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit =
synchronized {
- for ((taskId, stageId, stageAttemptID, accumUpdates) <-
executorMetricsUpdate.accumUpdates) {
- updateTaskAccumulatorValues(taskId, stageId, stageAttemptID,
accumUpdates, finishTask = false)
- }
- }
-
- override def onStageSubmitted(stageSubmitted:
SparkListenerStageSubmitted): Unit = synchronized {
- val stageId = stageSubmitted.stageInfo.stageId
- val stageAttemptId = stageSubmitted.stageInfo.attemptId
- // Always override metrics for old stage attempt
- if (_stageIdToStageMetrics.contains(stageId)) {
- _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
- } else {
- // If a stage belongs to some SQL execution, its stageId will be put
in "onJobStart".
- // Since "_stageIdToStageMetrics" doesn't contain it, it must not
belong to any SQL execution.
- // So we can ignore it. Otherwise, this may lead to memory leaks
(SPARK-11126).
- }
- }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit =
synchronized {
- if (taskEnd.taskMetrics != null) {
- updateTaskAccumulatorValues(
- taskEnd.taskInfo.taskId,
- taskEnd.stageId,
- taskEnd.stageAttemptId,
- taskEnd.taskMetrics.externalAccums.map(a =>
a.toInfo(Some(a.value), None)),
- finishTask = true)
- }
- }
-
- /**
- * Update the accumulator values of a task with the latest metrics for
this task. This is called
- * every time we receive an executor heartbeat or when a task finishes.
- */
- protected def updateTaskAccumulatorValues(
- taskId: Long,
- stageId: Int,
- stageAttemptID: Int,
- _accumulatorUpdates: Seq[AccumulableInfo],
- finishTask: Boolean): Unit = {
- val accumulatorUpdates =
- _accumulatorUpdates.filter(_.update.isDefined).map(accum =>
(accum.id, accum.update.get))
-
- _stageIdToStageMetrics.get(stageId) match {
- case Some(stageMetrics) =>
- if (stageAttemptID < stageMetrics.stageAttemptId) {
- // A task of an old stage attempt. Because a new stage is
submitted, we can ignore it.
- } else if (stageAttemptID > stageMetrics.stageAttemptId) {
- logWarning(s"A task should not have a higher stageAttemptID
($stageAttemptID) then " +
- s"what we have seen (${stageMetrics.stageAttemptId})")
- } else {
- // TODO We don't know the attemptId. Currently, what we can do
is overriding the
--- End diff --
this comment was hard to make sense of (you shouldn't ever have two tasks
with the same taskId, even with speculation), but I think there is something
here which may still be worth mentioning. You aggregate metrics across all
attempts for a given task (aka "index"), even speculative ones (before and
after your change) -- I'd mention that in a comment.
(The index is available in onTaskStart / End if we wanted to de-duplicate.)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]