attilapiros opened a new pull request, #38083:
URL: https://github.com/apache/spark/pull/38083
### What changes were proposed in this pull request?
Fix a race condition in ExecutorMetricsPoller between
`getExecutorUpdates()` and `onTaskStart()` methods by avoiding removing entries
when another stage is not started yet.
### Why are the changes needed?
Spurious failures are reported because of the following assert:
```
22/09/29 09:46:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in
thread Thread[Executor task launch worker for task 3063.0 in stage 1997.0 (TID
677249),5,main]
java.lang.AssertionError: assertion failed: task count shouldn't below 0
at scala.Predef$.assert(Predef.scala:223)
at
org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
at
org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
at
org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:737)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
22/09/29 09:46:24 INFO MemoryStore: MemoryStore cleared
22/09/29 09:46:24 INFO BlockManager: BlockManager stopped
22/09/29 09:46:24 INFO ShutdownHookManager: Shutdown hook called
22/09/29 09:46:24 INFO ShutdownHookManager: Deleting directory
/mnt/yarn/usercache/hadoop/appcache/application_1664443624160_0001/spark-93efc2d4-84de-494b-a3b7-2cb1c3a45426
```
I have checked the code and the basic assumption to have at least as many
`onTaskStart()` calls as `onTaskCompletion()` for the same `stageId` &
`stageAttemptId` pair is correct. But there is race condition between
`getExecutorUpdates()` and `onTaskStart()`.
First of all we have two different threads:
- task runner: to execute the task and informs `ExecutorMetricsPoller` about
task starts and completion
- heartbeater: which uses the `ExecutorMetricsPoller` to get the metrics
To show the race condition assume a task just finished which was running on
its own (no other tasks was running). So this will decrease the `count` from 1
to 0.
On the task runner thread let say a new task starts. So the execution is in
the `onTaskStart()` method let's assume the `countAndPeaks` is already computed
and here the counter is 0 but the execution is still before incrementing the
counter. So we are in between the following two lines:
```scala
val countAndPeaks = stageTCMP.computeIfAbsent((stageId, stageAttemptId),
_ => TCMP(new AtomicLong(0), new
AtomicLongArray(ExecutorMetricType.numMetrics)))
val stageCount = countAndPeaks.count.incrementAndGet()
```
Let's look at the other thread (heartbeater) where the
`getExecutorUpdates()` is running and it is at the `removeIfInactive()` method:
```scala
def removeIfInactive(k: StageKey, v: TCMP): TCMP = {
if (v.count.get == 0) {
logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
null
} else {
v
}
}
```
And here this entry is removed from `stageTCMP` as the count is 0.
Let's go back to the task runner thread where we increase the counter to 1
but that value will be lost as we have no entry in the `stageTCMP` for this
stage and attempt.
So if a new task comes instead of 2 we will have 1 in the `stageTCMP` and
when those two tasks finishes the second one will decrease the counter from 0
to -1. This is when the assert raised.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
I managed to reproduce the issue with a temporary test:
```scala
test("reproduce assert failure") {
val testMemoryManager = new TestMemoryManager(new SparkConf())
val taskId = new AtomicLong(0)
val runFlag = new AtomicBoolean(true)
val poller = new ExecutorMetricsPoller(testMemoryManager, 1000, None)
val callUpdates = new Thread("getExecutorOpdates") {
override def run() {
while (runFlag.get()) { poller.getExecutorUpdates.size }
}
}
val taskStartRunner1 = new Thread("taskRunner1") {
override def run() {
while (runFlag.get) {
var l = taskId.incrementAndGet()
poller.onTaskStart(l, 0, 0)
poller.onTaskCompletion(l, 0, 0)
}
}
}
val taskStartRunner2 = new Thread("taskRunner2") {
override def run() {
while (runFlag.get) {
var l = taskId.incrementAndGet()
poller.onTaskStart(l, 0, 0)
poller.onTaskCompletion(l, 0, 0)
}
}
}
val taskStartRunner3 = new Thread("taskRunner3") {
override def run() {
while (runFlag.get) {
var l = taskId.incrementAndGet()
var m = taskId.incrementAndGet()
poller.onTaskStart(l, 0, 0)
poller.onTaskStart(m, 0, 0)
poller.onTaskCompletion(l, 0, 0)
poller.onTaskCompletion(m, 0, 0)
}
}
}
callUpdates.start()
taskStartRunner1.start()
taskStartRunner2.start()
taskStartRunner3.start()
Thread.sleep(1000 * 20)
runFlag.set(false)
callUpdates.join()
taskStartRunner1.join()
taskStartRunner2.join()
taskStartRunner3.join()
}
```
The assert which raised is:
```
Exception in thread "taskRunner3" java.lang.AssertionError: assertion
failed: task count shouldn't below 0
at scala.Predef$.assert(Predef.scala:223)
at
org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
at
org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1828)
at
org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
at
org.apache.spark.executor.ExecutorMetricsPollerSuite$$anon$4.run(ExecutorMetricsPollerSuite.scala:64)
```
But when I switch off `removeIfInactive()` by using the following code:
```scala
if (false && v.count.get == 0) {
logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
null
} else {
v
}
```
Then no assert is raised.
Closes #38056 from attilapiros/SPARK-40617.
Authored-by: attilapiros <[email protected]>
Signed-off-by: attilapiros <[email protected]>
(cherry picked from commit 564a51b64e71f7402c2674de073b3b18001df56f)
Signed-off-by: attilapiros <[email protected]>
(cherry picked from commit 90a27757ec17c2511049114a437f365326e51225)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]