HeartSaVioR commented on a change in pull request #26218: [SPARK-29562][sql] 
Speed up and slim down metric aggregation in SQL listener.
URL: https://github.com/apache/spark/pull/26218#discussion_r337894767
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ##########
 @@ -425,12 +432,72 @@ private class LiveExecutionData(val executionId: Long) 
extends LiveEntity {
 }
 
 private class LiveStageMetrics(
-    val stageId: Int,
-    var attemptId: Int,
-    val accumulatorIds: Set[Long],
-    val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])
-
-private class LiveTaskMetrics(
-    val ids: Array[Long],
-    val values: Array[Long],
-    val succeeded: Boolean)
+    val attemptId: Int,
+    val numTasks: Int,
+    val accumulatorIds: Set[Long]) {
+
+  /**
+   * Mapping of task IDs to the partition index they're computing. Note this 
may contain more
+   * elements than the stage's number of tasks, if speculative execution is on.
+   */
+  private val taskIndices = new OpenHashMap[Long, Int]()
+
+  /** Bit set tracking which partition indices have been successfully 
computed. */
+  private val completedParts = new mutable.BitSet()
+
+  /**
+   * Task metrics values for the stage. Maps the metric ID to the metric 
values for each
+   * partition. For each metric ID, there will be the same number of values as 
the number
+   * of partitions. This relies on `SQLMetrics.stringValue` treating 0 as a 
neutral value,
+   * independent of the actual metric type.
+   */
+  private val taskMetrics = new ConcurrentHashMap[Long, Array[Long]]()
+
+  def registerTask(taskId: Long, partIdx: Int): Unit = {
+    taskIndices.update(taskId, partIdx)
+  }
+
+  def updateTaskMetrics(
+      taskId: Long,
+      eventPartIdx: Int,
+      finished: Boolean,
+      accumUpdates: Seq[AccumulableInfo]): Unit = {
+    val partIdx = if (eventPartIdx == -1) {
+      if (!taskIndices.contains(taskId)) {
+        // We probably missed the start event for the task, just ignore it.
+        return
+      }
+      taskIndices(taskId)
+    } else {
+      // Here we can recover from a missing task start event. Just register 
the task again.
+      registerTask(taskId, eventPartIdx)
+      eventPartIdx
+    }
+
+    if (completedParts.contains(partIdx)) {
+      return
+    }
+
+    accumUpdates
+      .filter { acc => acc.update.isDefined && accumulatorIds.contains(acc.id) 
}
+      .foreach { acc =>
+        // In a live application, accumulators have Long values, but when 
reading from event
+        // logs, they have String values. For now, assume all accumulators are 
Long and covert
 
 Review comment:
   nit: it was copied & pasted but better to fix here. `covert` -> `convert`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to