HeartSaVioR commented on a change in pull request #26218: [SPARK-29562][sql] 
Speed up and slim down metric aggregation in SQL listener.
URL: https://github.com/apache/spark/pull/26218#discussion_r337908742
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ##########
 @@ -425,12 +432,72 @@ private class LiveExecutionData(val executionId: Long) 
extends LiveEntity {
 }
 
 private class LiveStageMetrics(
-    val stageId: Int,
-    var attemptId: Int,
-    val accumulatorIds: Set[Long],
-    val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])
-
-private class LiveTaskMetrics(
-    val ids: Array[Long],
-    val values: Array[Long],
-    val succeeded: Boolean)
+    val attemptId: Int,
+    val numTasks: Int,
+    val accumulatorIds: Set[Long]) {
+
+  /**
+   * Mapping of task IDs to the partition index they're computing. Note this 
may contain more
+   * elements than the stage's number of tasks, if speculative execution is on.
+   */
+  private val taskIndices = new OpenHashMap[Long, Int]()
+
+  /** Bit set tracking which partition indices have been successfully 
computed. */
+  private val completedParts = new mutable.BitSet()
+
+  /**
+   * Task metrics values for the stage. Maps the metric ID to the metric 
values for each
+   * partition. For each metric ID, there will be the same number of values as 
the number
+   * of partitions. This relies on `SQLMetrics.stringValue` treating 0 as a 
neutral value,
+   * independent of the actual metric type.
+   */
+  private val taskMetrics = new ConcurrentHashMap[Long, Array[Long]]()
+
+  def registerTask(taskId: Long, partIdx: Int): Unit = {
+    taskIndices.update(taskId, partIdx)
+  }
+
+  def updateTaskMetrics(
+      taskId: Long,
+      eventPartIdx: Int,
+      finished: Boolean,
+      accumUpdates: Seq[AccumulableInfo]): Unit = {
+    val partIdx = if (eventPartIdx == -1) {
 
 Review comment:
   nit: might be better to have constant for `-1` to denote unknown partition 
index.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to