gowa commented on PR #37206:
URL: https://github.com/apache/spark/pull/37206#issuecomment-1690317793

   @JoshRosen, @eejbyfeldt , I hope it is fine that I am posting here after the 
ticket has been closed. We caught a problem which, I believe, has the same 
'true' root cause as in this ticket, and I also think that the 'true' root 
cause was not eliminated by the fix. I think that 
'ConcurrentModificationException' is only one of the possible outcomes in the 
various race conditions scenarios (concurrent reads/writes). Particularly, I 
managed to catch NullPointerException (instead of 
ConcurrentModificationException) in my custom accumulator where I would never 
expect it to be caught. Below is a simple reproducer for demonstrating the 
bigger problem:
   
   ```
   import java.{lang => jl}
   import org.apache.spark.util.AccumulatorV2
   
   //just a simple custom accumulator to demonstrate the problem: initially 
'false'. can be changed to 'true'.
   //see my comments for '_set' and 'isZero'
   
   class BooleanAccumulator extends AccumulatorV2[jl.Boolean, jl.Boolean] {
     private var _set = jl.Boolean.FALSE // supposed to be never null. In my 
real-world issue there is 'private final Map' in Java class which is supposed 
to be not null ever
   
     override def isZero: Boolean = ! _set.booleanValue()  // however, 
NullPointerException will be thrown here from executor-heartbeater
   
     override def copy(): BooleanAccumulator = {
       val newAcc = new BooleanAccumulator
       newAcc._set = this._set
       newAcc
     }
   
     override def reset(): Unit = {
        _set = jl.Boolean.FALSE
     }
   
     override def add(v: jl.Boolean): Unit = {
       if (v.booleanValue()) {
          _set = jl.Boolean.TRUE
        }
     }
   
     override def merge(other: AccumulatorV2[jl.Boolean, jl.Boolean]): Unit = 
other match {
       case o: BooleanAccumulator =>
          if (!_set.booleanValue())
                _set = o._set
       case _ =>
         throw new UnsupportedOperationException(
           s"Cannot merge ${this.getClass.getName} with 
${other.getClass.getName}")
     }
   
     override def value: jl.Boolean = _set
   }
   
   //some dummy logic using that accumulator
   val rdd = sc.parallelize(Seq(1), 1);
   
   def tryReproduce(){
     var acc: BooleanAccumulator = new BooleanAccumulator();
     sc.register(acc,"acc")
     rdd.foreachPartition { it =>
       val cnt = it.count(_ => true)
       acc.add(true);
     }
   }
   
   tryReproduce()
   ```
   
   The code listed above should be executed from the spark-shell, started with 
spark.executor.heartbeatInterval=1 (ms) in order to increase the chances of the 
appearance of the issue (on my laptop it is enough to call tryReproduce() just 
once):
   ```
   spark-3.3.3-bin-hadoop3/bin/spark-shell --master spark://127.0.1.1:7077 -c 
spark.executor.heartbeatInterval=1 -c spark.executor.heartbeat.maxFailures=60000
   ```
   
   The NullPointerException from the 'executor-heartbeater' thread looks like 
this:
   ```
   23/08/23 18:24:42 ERROR Utils: Uncaught exception in thread 
executor-heartbeater
   java.lang.NullPointerException
           at 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$BooleanAccumulator.isZero(<console>:27)
           at 
org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$2(Executor.scala:1042)
           at 
org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$2$adapted(Executor.scala:1042)
           at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
           at scala.collection.Iterator.foreach(Iterator.scala:943)
           at scala.collection.Iterator.foreach$(Iterator.scala:943)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
           at scala.collection.IterableLike.foreach(IterableLike.scala:74)
           at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
           at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
           at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
           at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
           at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
           at 
scala.collection.TraversableLike.filterNot(TraversableLike.scala:403)
           at 
scala.collection.TraversableLike.filterNot$(TraversableLike.scala:403)
           at 
scala.collection.AbstractTraversable.filterNot(Traversable.scala:108)
           at 
org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042)
           at scala.collection.Iterator.foreach(Iterator.scala:943)
           at scala.collection.Iterator.foreach$(Iterator.scala:943)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
           at scala.collection.IterableLike.foreach(IterableLike.scala:74)
           at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
           at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
           at 
org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036)
           at 
org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
           at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
           at 
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
           at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
           at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
           at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
           at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   In my real-world case, my accumulator has a **final** Java field which can 
never be null... Therefore, I suspect there is some lack of synchronization 
between the deserialization code and accessing the 'isZero' method of a just 
deserialized accumulator instance. And, therefore, I think replacing 
ArrayBuffer by CopyOnWriteArrayList does not completely fix the bigger issue.
   
   Also, in my real-world case we have default 10s for 
spark.executor.heartbeatInterval. Still, somehow we managed to catch many 
NullPointerExceptions from the 'executor-heartbeater' thread over 1 day, 
resulting in executors re-started (which led to unbalanced load distribution in 
a Spark Standalone cluster).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to