srowen commented on a change in pull request #31540:
URL: https://github.com/apache/spark/pull/31540#discussion_r579662690



##########
File path: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
##########
@@ -449,39 +449,46 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, 
jl.Double] {
  * @since 2.0.0
  */
 class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
-  private val _list: java.util.List[T] = Collections.synchronizedList(new 
ArrayList[T]())
+  private var _list: java.util.List[T] = _
+
+  private def getOrCreate = {
+    _list = Option(_list).getOrElse(new java.util.ArrayList[T]())
+    _list
+  }
 
   /**
    * Returns false if this accumulator instance has any values in it.
    */
-  override def isZero: Boolean = _list.isEmpty
+  override def isZero: Boolean = this.synchronized(getOrCreate.isEmpty)
 
   override def copyAndReset(): CollectionAccumulator[T] = new 
CollectionAccumulator
 
   override def copy(): CollectionAccumulator[T] = {
     val newAcc = new CollectionAccumulator[T]
-    _list.synchronized {
-      newAcc._list.addAll(_list)
+    this.synchronized {

Review comment:
       Obviously we can't sync on _list anymore. This changes to sync on the 
object itself, which should be fine AFAIK - I don't think anything else depends 
on the lock of the accumulator itself and it's only manipulating its own state 
while holding the lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to