AngersZhuuuu opened a new pull request #33780:
URL: https://github.com/apache/spark/pull/33780
### What changes were proposed in this pull request?
We meet a case AM lose connection
```
21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result
RpcResponse{requestId=5675952834716124039,
body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to
xx.xx.xx.xx:41420; closing connection
java.nio.channels.ClosedChannelException
at
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
Check the code about client, when AMEndpoint disconnected, will finish
Application with SUCCESS final status
```
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
// In cluster mode or unmanaged am case, do not rely on the
disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the
driver fails
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
logInfo(s"Driver terminated or disconnected! Shutting down.
$remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED,
ApplicationMaster.EXIT_SUCCESS)
}
}
```
Normally say in client mode, when application success, driver will stop and
AM loss connection, it's ok that exit with SUCCESS, but if there is a not work
problem cause disconnected. Still finish with final status is not correct.
Then YarnClientSchedulerBackend will receive application report with final
status with success and stop SparkContext cause application failed but mark it
as a normal stop.
```
private class MonitorThread extends Thread {
private var allowInterrupt = true
override def run() {
try {
val YarnAppReport(_, state, diags) =
client.monitorApplication(appId.get, logApplicationReport = false)
logError(s"YARN application has exited unexpectedly with state
$state! " +
"Check the YARN application logs for more details.")
diags.foreach { err =>
logError(s"Diagnostics message: $err")
}
allowInterrupt = false
sc.stop()
} catch {
case e: InterruptedException => logInfo("Interrupting monitor
thread")
}
}
def stopMonitor(): Unit = {
if (allowInterrupt) {
this.interrupt()
}
```
IMO, we should send a `Shutdown` message to yarn client mode AM to make sure
the shut down case
### Why are the changes needed?
Fix bug
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]