AngersZhuuuu opened a new pull request #33780:
URL: https://github.com/apache/spark/pull/33780


   ### What changes were proposed in this pull request?
   We meet a case AM lose connection
   ```
   21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result 
RpcResponse{requestId=5675952834716124039, 
body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to 
xx.xx.xx.xx:41420; closing connection
   java.nio.channels.ClosedChannelException
           at 
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
           at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
           at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
           at 
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
           at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
           at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
           at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   Check the code about client, when AMEndpoint disconnected, will finish 
Application with SUCCESS final status
   ```
   override def onDisconnected(remoteAddress: RpcAddress): Unit = {
         // In cluster mode or unmanaged am case, do not rely on the 
disassociated event to exit
         // This avoids potentially reporting incorrect exit codes if the 
driver fails
         if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
           logInfo(s"Driver terminated or disconnected! Shutting down. 
$remoteAddress")
           finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
         }
       }
   ```
   Normally say in client mode, when application success, driver will stop and 
AM loss connection, it's ok that exit with SUCCESS, but if there is a not work 
problem cause disconnected. Still finish with final status is not correct.
   Then YarnClientSchedulerBackend will receive application report with final 
status with success and stop SparkContext cause application failed but mark it 
as a normal stop.
   ```
     private class MonitorThread extends Thread {
       private var allowInterrupt = true
   
       override def run() {
         try {
           val YarnAppReport(_, state, diags) =
             client.monitorApplication(appId.get, logApplicationReport = false)
           logError(s"YARN application has exited unexpectedly with state 
$state! " +
             "Check the YARN application logs for more details.")
           diags.foreach { err =>
             logError(s"Diagnostics message: $err")
           }
           allowInterrupt = false
           sc.stop()
         } catch {
           case e: InterruptedException => logInfo("Interrupting monitor 
thread")
         }
       }
   
       def stopMonitor(): Unit = {
         if (allowInterrupt) {
           this.interrupt()
         }
   ```
   IMO, we should send a `Shutdown` message to yarn client mode AM to make sure 
the shut down case
   
   ### Why are the changes needed?
   Fix bug
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to