Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18243513
  
    --- Diff: 
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
---
    @@ -71,80 +74,108 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict 
with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't 
conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend 
can pick it up.
    -      System.setProperty("spark.yarn.app.id", 
appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that 
YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", 
appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best 
case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a 
best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird 
failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's 
the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || 
isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > 
FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, 
ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > 
FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, 
ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into 
the
    -    // Hadoop UGI. This has to happen before the startUserClass which does 
a
    -    // doAs in order for the credentials to be passed on to the executor 
containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into 
the
    +      // Hadoop UGI. This has to happen before the startUserClass which 
does a
    +      // doAs in order for the credentials to be passed on to the executor 
containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
    --- End diff --
    
    The reason I swallowed it was to return the exit code I explicitly set.  It 
doesn't matter to much as re-throwing with exit with 1 but its a little nicer 
to have the exit code.  We probably shouldn't be swallowing errors or perhaps 
scala !NonFatal though. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to