mcdull-zhang opened a new pull request #34569:
URL: https://github.com/apache/spark/pull/34569


   ### What changes were proposed in this pull request?
   
   In our production environment, you can use the following code to reproduce 
the problem:
   
   ```scala
   val acc = sc.collectionAccumulator[String]("test_acc")
       
   sc.parallelize(Array(0)).foreach(_ => {
     var i = 0
     var stop = false
     val start = System.currentTimeMillis()
     while (!stop) {
       acc.add(i.toString)
       if (i % 10000 == 0) {
         acc.reset()
         if ((System.currentTimeMillis() - start) / 1000 > 120) {
           stop = true
         }
       }
       i = i + 1
     }
   })
   sc.stop()
   ```
   
   This code can make the executor fail to send heartbeats, even more than the 
default 60 times, and then the executor exits.
   
   ```tex
   21/11/11 21:00:23 WARN Executor: Issue communicating with driver in 
heartbeater
   org.apache.spark.SparkException: Exception thrown in awaitResult: 
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
        at 
org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007)
        at 
org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
        at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.util.ConcurrentModificationException
        at java.util.ArrayList.writeObject(ArrayList.java:766)
        at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
        at 
org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244)
        at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555)
        at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102)
        ... 12 more
   21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to 
driver more than 60 times
   ```
   
   The reason is that when the heartbeat thread serializes the Collection 
Accumulator, the task thread may modify the Collection Accumulator
   
   
   ### Why are the changes needed?
   
   Avoid heartbeat reporting failure, which may cause application failure
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   
   ### How was this patch tested?
   Existing tests and manual tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to