attilapiros opened a new pull request, #38083:
URL: https://github.com/apache/spark/pull/38083

   ### What changes were proposed in this pull request?
   
   Fix a race condition in ExecutorMetricsPoller between  
`getExecutorUpdates()` and `onTaskStart()` methods by avoiding removing entries 
when another stage is not started yet.
   
   ### Why are the changes needed?
   
   Spurious failures are reported because of the following assert:
   
   ```
   22/09/29 09:46:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker for task 3063.0 in stage 1997.0 (TID 
677249),5,main]
   java.lang.AssertionError: assertion failed: task count shouldn't below 0
        at scala.Predef$.assert(Predef.scala:223)
        at 
org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
        at 
org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
        at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
        at 
org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:737)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   22/09/29 09:46:24 INFO MemoryStore: MemoryStore cleared
   22/09/29 09:46:24 INFO BlockManager: BlockManager stopped
   22/09/29 09:46:24 INFO ShutdownHookManager: Shutdown hook called
   22/09/29 09:46:24 INFO ShutdownHookManager: Deleting directory 
/mnt/yarn/usercache/hadoop/appcache/application_1664443624160_0001/spark-93efc2d4-84de-494b-a3b7-2cb1c3a45426
   ```
   
   I have checked the code and the basic assumption to have at least as many 
`onTaskStart()` calls as `onTaskCompletion()` for the same `stageId` & 
`stageAttemptId` pair is correct. But there is race condition between 
`getExecutorUpdates()` and `onTaskStart()`.
   
   First of all we have two different threads:
   - task runner: to execute the task and informs `ExecutorMetricsPoller` about 
task starts and completion
   - heartbeater: which uses the `ExecutorMetricsPoller` to get the metrics
   
   To show the race condition assume a task just finished which was running on 
its own (no other tasks was running). So this will decrease the `count` from 1 
to 0.
   
   On the task runner thread let say a new task starts. So the execution is in 
the `onTaskStart()` method let's assume the `countAndPeaks` is already computed 
and here the counter is 0 but the execution is still before incrementing the 
counter. So we are in between the following two lines:
   
   ```scala
    val countAndPeaks = stageTCMP.computeIfAbsent((stageId, stageAttemptId),
         _ => TCMP(new AtomicLong(0), new 
AtomicLongArray(ExecutorMetricType.numMetrics)))
    val stageCount = countAndPeaks.count.incrementAndGet()
   ```
   
   Let's look at the other thread (heartbeater) where the 
`getExecutorUpdates()` is running and it is at the `removeIfInactive()` method:
   
   ```scala
      def removeIfInactive(k: StageKey, v: TCMP): TCMP = {
         if (v.count.get == 0) {
           logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
           null
         } else {
           v
         }
       }
   ```
   
   And here this entry is removed from `stageTCMP` as the count is 0.
   
   Let's go back to the task runner thread where we increase the counter to 1 
but that value will be lost as we have no entry in the `stageTCMP` for this 
stage and attempt.
   
   So if a new task comes instead of 2 we will have 1 in the `stageTCMP` and 
when those two tasks finishes the second one will decrease the counter from 0 
to -1. This is when the assert raised.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Unit test.
   
   I managed to reproduce the issue with a temporary test:
   
   ```scala
    test("reproduce assert failure") {
       val testMemoryManager = new TestMemoryManager(new SparkConf())
       val taskId = new AtomicLong(0)
       val runFlag = new AtomicBoolean(true)
       val poller = new ExecutorMetricsPoller(testMemoryManager, 1000, None)
       val callUpdates = new Thread("getExecutorOpdates") {
         override def run() {
           while (runFlag.get())  { poller.getExecutorUpdates.size }
         }
       }
   
       val taskStartRunner1 = new Thread("taskRunner1") {
         override def run() {
           while (runFlag.get) {
             var l = taskId.incrementAndGet()
             poller.onTaskStart(l, 0, 0)
             poller.onTaskCompletion(l, 0, 0)
           }
         }
       }
       val taskStartRunner2 = new Thread("taskRunner2") {
         override def run() {
           while (runFlag.get) {
             var l = taskId.incrementAndGet()
             poller.onTaskStart(l, 0, 0)
             poller.onTaskCompletion(l, 0, 0)
           }
         }
       }
       val taskStartRunner3 = new Thread("taskRunner3") {
         override def run() {
           while (runFlag.get) {
             var l = taskId.incrementAndGet()
             var m = taskId.incrementAndGet()
             poller.onTaskStart(l, 0, 0)
             poller.onTaskStart(m, 0, 0)
             poller.onTaskCompletion(l, 0, 0)
             poller.onTaskCompletion(m, 0, 0)
           }
         }
       }
       callUpdates.start()
       taskStartRunner1.start()
       taskStartRunner2.start()
       taskStartRunner3.start()
   
       Thread.sleep(1000 * 20)
   
       runFlag.set(false)
       callUpdates.join()
       taskStartRunner1.join()
       taskStartRunner2.join()
       taskStartRunner3.join()
     }
   ```
   
   The assert which raised is:
   
   ```
   Exception in thread "taskRunner3" java.lang.AssertionError: assertion 
failed: task count shouldn't below 0
           at scala.Predef$.assert(Predef.scala:223)
           at 
org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
           at 
org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
           at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1828)
           at 
org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
           at 
org.apache.spark.executor.ExecutorMetricsPollerSuite$$anon$4.run(ExecutorMetricsPollerSuite.scala:64)
   ```
   
   But when I switch off `removeIfInactive()` by using the following code:
   ```scala
           if (false && v.count.get == 0) {
             logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
             null
           } else {
             v
           }
   ```
   
   Then no assert is raised.
   
   Closes #38056 from attilapiros/SPARK-40617.
   
   Authored-by: attilapiros <[email protected]>
   Signed-off-by: attilapiros <[email protected]>
   (cherry picked from commit 564a51b64e71f7402c2674de073b3b18001df56f)
   Signed-off-by: attilapiros <[email protected]>
   (cherry picked from commit 90a27757ec17c2511049114a437f365326e51225)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to