JoshRosen commented on PR #37206: URL: https://github.com/apache/spark/pull/37206#issuecomment-1189930626
@mridulm, thanks for the ping. `CopyOnWriteArrayList` supports single-threaded writes and concurrent reads. It looks like `externalAccums` is only written from `registerAccumulator`, which is only called from `TaskContext.registerAccumulator`, which is only called during `AccumulatorV2` deserialization, which should only occur when deserializing the task binary at the beginning of `Task` execution. In the [SPARK-39696](https://issues.apache.org/jira/browse/SPARK-39696) JIRA, it looks like the read is occurring in the executor's `reportHeartBeat` function at https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1042 As far as I know, `taskRunner.task` will be `null` initially and will be populated once the task is deserialized at https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 Assuming my above description is correct, I don't understand how concurrent reads and writes can occur: in normal operation I would expect that writes only occur during task binary deserialization and that reads from the heartbeat thread can only occur after deserialization has completed. Perhaps accumulators are being deserialized and registered _while the task is running_ (not just during task setup)? For example, maybe a case class is defined on the driver and its closure accidentally closes over something that contains an AccumulatorV2, causing us to deserialize AccumulatorV2 instances when deserializing data values. In the simple case, we'd have one writer thread (the task's main runner thread) racing with the heartbeat thread. In more complex cases, I can imagine scenarios where a single task could consist of multiple threads (e.g. for PySpark) and deserialization might happen in multiple of them in case of spill (e.g. during RDD aggregation). If this is the case then it might explain the race condition. If that's true, though, then I'm wondering whether we're registering different instances of the same accumulator multiple times in the same task (which seems like it would be a performance and correctness bug). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
