JoshRosen commented on PR #37206:
URL: https://github.com/apache/spark/pull/37206#issuecomment-1189930626

   @mridulm, thanks for the ping.
   
   `CopyOnWriteArrayList` supports single-threaded writes and concurrent reads. 
   
   It looks like `externalAccums` is only written from `registerAccumulator`, 
which is only called from `TaskContext.registerAccumulator`, which is only 
called during `AccumulatorV2` deserialization, which should only occur when 
deserializing the task binary at the beginning of `Task` execution.
   
   In the [SPARK-39696](https://issues.apache.org/jira/browse/SPARK-39696) 
JIRA, it looks like the read is occurring in the executor's `reportHeartBeat` 
function at 
https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1042
   
   As far as I know, `taskRunner.task` will be `null` initially and will be 
populated once the task is deserialized at 
https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508
   
   Assuming my above description is correct, I don't understand how concurrent 
reads and writes can occur: in normal operation I would expect that writes only 
occur during task binary deserialization and that reads from the heartbeat 
thread can only occur after deserialization has completed.
   
   Perhaps accumulators are being deserialized and registered _while the task 
is running_ (not just during task setup)? For example, maybe a case class is 
defined on the driver and its closure accidentally closes over something that 
contains an AccumulatorV2, causing us to deserialize AccumulatorV2 instances 
when deserializing data values. In the simple case, we'd have one writer thread 
(the task's main runner thread) racing with the heartbeat thread. In more 
complex cases, I can imagine scenarios where a single task could consist of 
multiple threads (e.g. for PySpark) and deserialization might happen in 
multiple of them in case of spill (e.g. during RDD aggregation). If this is the 
case then it might explain the race condition. If that's true, though, then I'm 
wondering whether we're registering different instances of the same accumulator 
multiple times in the same task (which seems like it would be a performance and 
correctness bug).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to