Baohe Zhang created SPARK-31664:
-----------------------------------

             Summary: Race in YARN scheduler shutdown leads to uncaught 
SparkException "Could not find CoarseGrainedScheduler"
                 Key: SPARK-31664
                 URL: https://issues.apache.org/jira/browse/SPARK-31664
             Project: Spark
          Issue Type: Bug
          Components: Scheduler, YARN
    Affects Versions: 3.0.0, 3.0.1, 3.1.0
            Reporter: Baohe Zhang


I used this command to run SparkPi on a yarn cluster with dynamicAllocation 
enabled: "$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster 
--class org.apache.spark.examples.SparkPi ./spark-examples.jar 1000" and 
received error log below every time.

 
{code:java}
20/05/06 16:31:44 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at 
org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:169)
        at 
org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
        at 
org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:684)
        at 
org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66)
        at 
org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
20/05/06 16:31:45 INFO MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
20/05/06 16:31:45 INFO MemoryStore: MemoryStore cleared
20/05/06 16:31:45 INFO BlockManager: BlockManager stopped
{code}
 

After some investigation, I found this issue might be introduced in 
[https://github.com/apache/spark/pull/25964]. There is a race between driver 
backend and executor backend that could happen when driver shutdown.

 

PR#25964 added a new message type LaunchedExecutor and updated the 
communication mechanism between executor and driver when launching executor to:
 # executor backend sends "RegisterExecutor" to the driver backend.
 # the driver backend replies "true".
 # executor backend instantiates executor once it receives "true" from driver 
backend.
 # after the executor is instantiated, the executor backend sends 
"LaunchedExecutor" to the driver backend.
 # the driver backend makes offers for executor when received 
"LaunchedExecutor".

So the issue occurs in steps 3 and 4. If the driver backend is stopped(hence 
driver endpoint removed in dispatcher) during step 3, in step 4, when executor 
backend tries to send "LaunchedExecutor" to driver backend, RPC dispatcher will 
throw a SparkException for "Could not find CoarseGrainedScheduler".  These 
exception logs are verbose and somewhat misleading.

 

This race can be fixed or greatly alleviated through these changes:

When the stop() in CoarseGrainedSchedulerBackend is called:
 # A stopping boolean variable is set to true.
 # driverEndpoint will not be stopped at this time. (dispatcher will stop it at 
the end)

And when the stopping is set to true, the driver backend will:
 # replies sendFailure to executor backend when receives "RegisterExecutor".
 # replies "StopExecutor" to executor backend (or "RemoveExecutor" to self) 
when receives "LaunchedExecutor"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to