Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/spark/pull/2577#discussion_r18243513
--- Diff:
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
---
@@ -71,80 +74,108 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments,
private val sparkContextRef = new AtomicReference[SparkContext](null)
final def run(): Int = {
- val appAttemptId = client.getAttemptId()
+ try {
+ val appAttemptId = client.getAttemptId()
- if (isDriver) {
- // Set the web ui port to be ephemeral for yarn so we don't conflict
with
- // other spark processes running on the same box
- System.setProperty("spark.ui.port", "0")
+ if (isDriver) {
+ // Set the web ui port to be ephemeral for yarn so we don't
conflict with
+ // other spark processes running on the same box
+ System.setProperty("spark.ui.port", "0")
- // Set the master property to match the requested mode.
- System.setProperty("spark.master", "yarn-cluster")
+ // Set the master property to match the requested mode.
+ System.setProperty("spark.master", "yarn-cluster")
- // Propagate the application ID so that YarnClusterSchedulerBackend
can pick it up.
- System.setProperty("spark.yarn.app.id",
appAttemptId.getApplicationId().toString())
- }
+ // Propagate the application ID so that
YarnClusterSchedulerBackend can pick it up.
+ System.setProperty("spark.yarn.app.id",
appAttemptId.getApplicationId().toString())
+ }
- logInfo("ApplicationAttemptId: " + appAttemptId)
+ logInfo("ApplicationAttemptId: " + appAttemptId)
- val cleanupHook = new Runnable {
- override def run() {
- // If the SparkContext is still registered, shut it down as a best
case effort in case
- // users do not call sc.stop or do System.exit().
- val sc = sparkContextRef.get()
- if (sc != null) {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- finish(FinalApplicationStatus.SUCCEEDED)
- }
+ val cleanupHook = new Runnable {
+ override def run() {
+ // If the SparkContext is still registered, shut it down as a
best case effort in case
+ // users do not call sc.stop or do System.exit().
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ }
+ val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+ val isLastAttempt = client.getAttemptId().getAttemptId() >=
maxAppAttempts
+
+ if (!finished) {
+ // this shouldn't ever happen, but if it does assume weird
failure
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+ "shutdown hook called without cleanly finishing")
+ }
- // Cleanup the staging dir after the app is finished, or if it's
the last attempt at
- // running the AM.
- val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
- val isLastAttempt = client.getAttemptId().getAttemptId() >=
maxAppAttempts
- if (finished || isLastAttempt) {
- cleanupStagingDir()
+ if (!unregistered) {
+ // we only want to unregister if we don't want the RM to retry
+ if (finalStatus == FinalApplicationStatus.SUCCEEDED ||
isLastAttempt) {
+ unregister(finalStatus, finalMsg)
+ cleanupStagingDir()
+ }
+ }
}
}
- }
- // Use higher priority than FileSystem.
- assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY >
FileSystem.SHUTDOWN_HOOK_PRIORITY)
- ShutdownHookManager
- .get().addShutdownHook(cleanupHook,
ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
+ // Use higher priority than FileSystem.
+ assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY >
FileSystem.SHUTDOWN_HOOK_PRIORITY)
+ ShutdownHookManager
+ .get().addShutdownHook(cleanupHook,
ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
- // Call this to force generation of secret so it gets populated into
the
- // Hadoop UGI. This has to happen before the startUserClass which does
a
- // doAs in order for the credentials to be passed on to the executor
containers.
- val securityMgr = new SecurityManager(sparkConf)
+ // Call this to force generation of secret so it gets populated into
the
+ // Hadoop UGI. This has to happen before the startUserClass which
does a
+ // doAs in order for the credentials to be passed on to the executor
containers.
+ val securityMgr = new SecurityManager(sparkConf)
- if (isDriver) {
- runDriver(securityMgr)
- } else {
- runExecutorLauncher(securityMgr)
+ if (isDriver) {
+ runDriver(securityMgr)
+ } else {
+ runExecutorLauncher(securityMgr)
+ }
+ } catch {
+ case e: Throwable => {
+ // catch everything else if not specifically handled
+ logError("Uncaught exception: ", e)
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+ "Uncaught exception: " + e.getMessage())
+ }
--- End diff --
The reason I swallowed it was to return the exit code I explicitly set. It
doesn't matter to much as re-throwing with exit with 1 but its a little nicer
to have the exit code. We probably shouldn't be swallowing errors or perhaps
scala !NonFatal though.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]