gowa commented on PR #37206:
URL: https://github.com/apache/spark/pull/37206#issuecomment-1690317793
@JoshRosen, @eejbyfeldt , I hope it is fine that I am posting here after the
ticket has been closed. We caught a problem which, I believe, has the same
'true' root cause as in this ticket, and I also think that the 'true' root
cause was not eliminated by the fix. I think that
'ConcurrentModificationException' is only one of the possible outcomes in the
various race conditions scenarios (concurrent reads/writes). Particularly, I
managed to catch NullPointerException (instead of
ConcurrentModificationException) in my custom accumulator where I would never
expect it to be caught. Below is a simple reproducer for demonstrating the
bigger problem:
```
import java.{lang => jl}
import org.apache.spark.util.AccumulatorV2
//just a simple custom accumulator to demonstrate the problem: initially
'false'. can be changed to 'true'.
//see my comments for '_set' and 'isZero'
class BooleanAccumulator extends AccumulatorV2[jl.Boolean, jl.Boolean] {
private var _set = jl.Boolean.FALSE // supposed to be never null. In my
real-world issue there is 'private final Map' in Java class which is supposed
to be not null ever
override def isZero: Boolean = ! _set.booleanValue() // however,
NullPointerException will be thrown here from executor-heartbeater
override def copy(): BooleanAccumulator = {
val newAcc = new BooleanAccumulator
newAcc._set = this._set
newAcc
}
override def reset(): Unit = {
_set = jl.Boolean.FALSE
}
override def add(v: jl.Boolean): Unit = {
if (v.booleanValue()) {
_set = jl.Boolean.TRUE
}
}
override def merge(other: AccumulatorV2[jl.Boolean, jl.Boolean]): Unit =
other match {
case o: BooleanAccumulator =>
if (!_set.booleanValue())
_set = o._set
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with
${other.getClass.getName}")
}
override def value: jl.Boolean = _set
}
//some dummy logic using that accumulator
val rdd = sc.parallelize(Seq(1), 1);
def tryReproduce(){
var acc: BooleanAccumulator = new BooleanAccumulator();
sc.register(acc,"acc")
rdd.foreachPartition { it =>
val cnt = it.count(_ => true)
acc.add(true);
}
}
tryReproduce()
```
The code listed above should be executed from the spark-shell, started with
spark.executor.heartbeatInterval=1 (ms) in order to increase the chances of the
appearance of the issue (on my laptop it is enough to call tryReproduce() just
once):
```
spark-3.3.3-bin-hadoop3/bin/spark-shell --master spark://127.0.1.1:7077 -c
spark.executor.heartbeatInterval=1 -c spark.executor.heartbeat.maxFailures=60000
```
The NullPointerException from the 'executor-heartbeater' thread looks like
this:
```
23/08/23 18:24:42 ERROR Utils: Uncaught exception in thread
executor-heartbeater
java.lang.NullPointerException
at
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$BooleanAccumulator.isZero(<console>:27)
at
org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$2(Executor.scala:1042)
at
org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$2$adapted(Executor.scala:1042)
at
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at
scala.collection.TraversableLike.filterNot(TraversableLike.scala:403)
at
scala.collection.TraversableLike.filterNot$(TraversableLike.scala:403)
at
scala.collection.AbstractTraversable.filterNot(Traversable.scala:108)
at
org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036)
at
org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
```
In my real-world case, my accumulator has a **final** Java field which can
never be null... Therefore, I suspect there is some lack of synchronization
between the deserialization code and accessing the 'isZero' method of a just
deserialized accumulator instance. And, therefore, I think replacing
ArrayBuffer by CopyOnWriteArrayList does not completely fix the bigger issue.
Also, in my real-world case we have default 10s for
spark.executor.heartbeatInterval. Still, somehow we managed to catch many
NullPointerExceptions from the 'executor-heartbeater' thread over 1 day,
resulting in executors re-started (which led to unbalanced load distribution in
a Spark Standalone cluster).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]