mcdull-zhang opened a new pull request #34569:
URL: https://github.com/apache/spark/pull/34569
### What changes were proposed in this pull request?
In our production environment, you can use the following code to reproduce
the problem:
```scala
val acc = sc.collectionAccumulator[String]("test_acc")
sc.parallelize(Array(0)).foreach(_ => {
var i = 0
var stop = false
val start = System.currentTimeMillis()
while (!stop) {
acc.add(i.toString)
if (i % 10000 == 0) {
acc.reset()
if ((System.currentTimeMillis() - start) / 1000 > 120) {
stop = true
}
}
i = i + 1
}
})
sc.stop()
```
This code can make the executor fail to send heartbeats, even more than the
default 60 times, and then the executor exits.
```tex
21/11/11 21:00:23 WARN Executor: Issue communicating with driver in
heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
at
org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007)
at
org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at
org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601)
at
org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244)
at
org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555)
at
org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102)
... 12 more
21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to
driver more than 60 times
```
The reason is that when the heartbeat thread serializes the Collection
Accumulator, the task thread may modify the Collection Accumulator
### Why are the changes needed?
Avoid heartbeat reporting failure, which may cause application failure
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests and manual tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]