attilapiros commented on code in PR #31871:
URL: https://github.com/apache/spark/pull/31871#discussion_r984068681


##########
core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala:
##########
@@ -124,17 +124,11 @@ private[spark] class ExecutorMetricsPoller(
    */
   def onTaskCompletion(taskId: Long, stageId: Int, stageAttemptId: Int): Unit 
= {
     // Decrement the task count.
-    // Remove the entry from stageTCMP if the task count reaches zero.
 
     def decrementCount(stage: StageKey, countAndPeaks: TCMP): TCMP = {
       val countValue = countAndPeaks.count.decrementAndGet()

Review Comment:
   So the basic assumption to have at least as many `onTaskStart()` calls as 
`onTaskCompletion()` for the same `stageId` & `stageAttemptId` pair is correct!
   But still there can be a race condition between `getExecutorUpdates()` and 
`onTaskStart`.
   
   Let me show you one of the possible scenario. 
   
   So assuming we are after a finished a task which was running on its own on 
this executor 
   so it decreases the `count` from 1 to 0.
   
   On the task runner thread let say a new task starts. 
   So we are in the `onTaskStart()` method we already taken out the `TCMP` 
   where the counter is 0 but we are just before incrementing the counter:
   
   ```
    val countAndPeaks = stageTCMP.computeIfAbsent((stageId, stageAttemptId),
         _ => TCMP(new AtomicLong(0), new 
AtomicLongArray(ExecutorMetricType.numMetrics)))
    val stageCount = countAndPeaks.count.incrementAndGet()
   ```
   
   On the heartbeater thread `getExecutorUpdates()` is running to be exact the 
`removeIfInactive()`:
   
   ```
      def removeIfInactive(k: StageKey, v: TCMP): TCMP = {
         if (v.count.get == 0) {
           logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
           null
         } else {
           v
         }
       }
   ```
   
   And we remove the entry from `stageTCMP` as the count is 0.
   
   Lets go back to the task runner thread where we increase the counter to 1 
but that value will be lost 
   as we have no entry in the `stageTCMP` for this stage and attempt.
   
   So if a new task comes instead of 2 we will have 1 in the `stageTCMP` and 
when those two tasks finishes 
   the second one will decrease the counter from 0 to -1.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to