cloud-fan commented on a change in pull request #34569:
URL: https://github.com/apache/spark/pull/34569#discussion_r749164305
##########
File path: core/src/main/scala/org/apache/spark/executor/Executor.scala
##########
@@ -990,12 +990,18 @@ private[spark] class Executor(
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime -
taskRunner.startGCTime)
- val accumulatorsToReport =
- if (HEARTBEAT_DROP_ZEROES) {
- taskRunner.task.metrics.accumulators().filterNot(_.isZero)
- } else {
- taskRunner.task.metrics.accumulators()
+ val accumulatorsToReport = {
+ val accs = if (HEARTBEAT_DROP_ZEROES) {
+ taskRunner.task.metrics.accumulators().filterNot(_.isZero)
+ } else {
+ taskRunner.task.metrics.accumulators()
+ }
+ accs.map {
+ case acc: CollectionAccumulator[_] =>
+ acc.copyAndSetMetadata()
Review comment:
This looks like a general problem. People can write custom accumulators
and hit this problem as well. Dealing with `CollectionAccumulator` only may not
be sufficient.
I think there are 2 approaches:
1. sycnronize/lock when serializing the object. Seems we can overwrite `def
writeObject` in `CollectionAccumulator` to do sycnronize/lock?
2. make a copy before serializing (this PR). We should add an API in
`AccumulatorV2` so that custom accumulators can do it as well.
Can we evaluate which one is better?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]