Repository: spark Updated Branches: refs/heads/branch-1.6 e2ce0caed -> b999fa43e
[SPARK-17696][SPARK-12330][CORE] Partial backport of to branch-1.6. >From the original commit message: This PR also fixes a regression caused by [SPARK-10987] whereby submitting a shutdown causes a race between the local shutdown procedure and the notification of the scheduler driver disconnection. If the scheduler driver disconnection wins the race, the coarse executor incorrectly exits with status 1 (instead of the proper status 0) Author: Charles Allen <charlesallen-net.com> (cherry picked from commit 2eaeafe8a2aa31be9b230b8d53d3baccd32535b1) Author: Charles Allen <char...@allen-net.com> Closes #15270 from vanzin/SPARK-17696. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b999fa43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b999fa43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b999fa43 Branch: refs/heads/branch-1.6 Commit: b999fa43ea0b509341ac2e130cc3787e5f8a75e5 Parents: e2ce0ca Author: Charles Allen <char...@allen-net.com> Authored: Wed Sep 28 14:39:50 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Sep 28 14:39:50 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b999fa43/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c2ebf30..47ce667 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,6 +19,7 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean import org.apache.hadoop.conf.Configuration @@ -45,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend( env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { + private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None @@ -106,19 +108,23 @@ private[spark] class CoarseGrainedExecutorBackend( } case StopExecutor => + stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => + stopping.set(true) executor.stop() stop() rpcEnv.shutdown() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - if (driver.exists(_.address == remoteAddress)) { + if (stopping.get()) { + logInfo(s"Driver from $remoteAddress disconnected during shutdown") + } else if (driver.exists(_.address == remoteAddress)) { logError(s"Driver $remoteAddress disassociated! Shutting down.") System.exit(1) } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org