Github user javabrett commented on a diff in the pull request:
https://github.com/apache/spark/pull/2577#discussion_r30769030
--- Diff:
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
---
@@ -383,40 +406,81 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments,
}
}
+ /**
+ * This system security manager applies to the entire process.
+ * It's main purpose is to handle the case if the user code does a
System.exit.
+ * This allows us to catch that and properly set the YARN application
status and
+ * cleanup if needed.
+ */
+ private def setupSystemSecurityManager(): Unit = {
+ try {
+ var stopped = false
+ System.setSecurityManager(new java.lang.SecurityManager() {
+ override def checkExit(paramInt: Int) {
+ if (!stopped) {
+ logInfo("In securityManager checkExit, exit code: " + paramInt)
+ if (paramInt == 0) {
+ finish(FinalApplicationStatus.SUCCEEDED,
ApplicationMaster.EXIT_SUCCESS)
+ } else {
+ finish(FinalApplicationStatus.FAILED,
+ paramInt,
+ "User class exited with non-zero exit code")
+ }
+ stopped = true
+ }
+ }
+ // required for the checkExit to work properly
+ override def checkPermission(perm: java.security.Permission): Unit
= {}
+ })
+ }
+ catch {
+ case e: SecurityException =>
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_SECURITY,
+ "Error in setSecurityManager")
+ logError("Error in setSecurityManager:", e)
+ }
+ }
+
+ /**
+ * Start the user class, which contains the spark driver, in a separate
Thread.
+ * If the main routine exits cleanly or exits with System.exit(0) we
+ * assume it was successful, for all other cases we assume failure.
+ *
+ * Returns the user thread that was started.
+ */
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
System.setProperty("spark.executor.instances",
args.numExecutors.toString)
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main",
classOf[Array[String]])
- userClassThread = new Thread {
+ val userThread = new Thread {
override def run() {
- var status = FinalApplicationStatus.FAILED
try {
- // Copy
val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
- // Some apps have "System.exit(0)" at the end. The user thread
will stop here unless
- // it has an uncaught exception thrown out. It needs a shutdown
hook to set SUCCEEDED.
- status = FinalApplicationStatus.SUCCEEDED
+ finish(FinalApplicationStatus.SUCCEEDED,
ApplicationMaster.EXIT_SUCCESS)
+ logDebug("Done running users class")
} catch {
case e: InvocationTargetException =>
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
-
- case e => throw e
+ case e: Exception =>
--- End diff --
I'm curious, should this be Throwable? If my application throws an
uncaught Error, shouldn't that also result in FAILED, and would it (still) do
so with this change? P.S. my Scala is not that strong.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]