Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4216#discussion_r24143599
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
    @@ -84,21 +94,92 @@ object SparkSubmit {
         if (appArgs.verbose) {
           printStream.println(appArgs)
         }
    -    val (childArgs, classpath, sysProps, mainClass) = 
createLaunchEnv(appArgs)
    -    launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
    +    appArgs.action match {
    +      case SparkSubmitAction.SUBMIT => submit(appArgs)
    +      case SparkSubmitAction.KILL => kill(appArgs)
    +      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    +    }
    +  }
    +
    +  /**
    +   * Kill an existing driver using the REST application submission 
protocol.
    +   * Standalone cluster mode only.
    +   */
    +  private def kill(args: SparkSubmitArguments): Unit = {
    +    val client = new StandaloneRestClient
    +    val response = client.killSubmission(args.master, args.driverToKill)
    +    response match {
    +      case k: KillSubmissionResponse => handleRestResponse(k)
    +      case r => handleUnexpectedRestResponse(r)
    +    }
       }
     
       /**
    -   * @return a tuple containing
    -   *           (1) the arguments for the child process,
    -   *           (2) a list of classpath entries for the child,
    -   *           (3) a list of system properties and env vars, and
    -   *           (4) the main class for the child
    +   * Request the status of an existing driver using the REST application 
submission protocol.
    +   * Standalone cluster mode only.
        */
    -  private[spark] def createLaunchEnv(args: SparkSubmitArguments)
    -      : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], 
String) = {
    +  private def requestStatus(args: SparkSubmitArguments): Unit = {
    +    val client = new StandaloneRestClient
    +    val response = client.requestSubmissionStatus(args.master, 
args.driverToRequestStatusFor)
    +    response match {
    +      case s: SubmissionStatusResponse => handleRestResponse(s)
    +      case r => handleUnexpectedRestResponse(r)
    +    }
    +  }
     
    -    // Values to return
    +  /**
    +   * Submit the application using the provided parameters.
    +   *
    +   * This runs in two steps. First, we prepare the launch environment by 
setting up
    +   * the appropriate classpath, system properties, and application 
arguments for
    +   * running the child main class based on the cluster manager and the 
deploy mode.
    +   * Second, we use this launch environment to invoke the main method of 
the child
    +   * main class.
    +   */
    +  private[spark] def submit(args: SparkSubmitArguments): Unit = {
    +    val (childArgs, childClasspath, sysProps, childMainClass) = 
prepareSubmitEnvironment(args)
    +    /*
    +     * In standalone cluster mode, there are two submission gateways:
    +     *   (1) The traditional Akka gateway using o.a.s.deploy.Client as a 
wrapper
    +     *   (2) The new REST-based gateway introduced in Spark 1.3
    +     * The latter is the default behavior as of Spark 1.3, but Spark 
submit will fail over
    +     * to use the legacy gateway if the master endpoint turns out to be 
not a REST server.
    +     */
    +    if (args.isStandaloneCluster) {
    +      try {
    +        printStream.println("Running Spark using the REST application 
submission protocol.")
    +        val client = new StandaloneRestClient
    +        val response = client.createSubmission(args)
    +        response match {
    +          case s: CreateSubmissionResponse => handleRestResponse(s)
    +          case r => handleUnexpectedRestResponse(r)
    +        }
    +      } catch {
    +        // Fail over to use the legacy submission gateway
    +        case e: SubmitRestConnectionException =>
    +          printStream.println(s"Master endpoint ${args.master} was not a " 
+
    +            s"REST server. Falling back to legacy submission gateway 
instead.")
    +          runMain(childArgs, childClasspath, sysProps, childMainClass)
    +      }
    +    // In all other modes, just run the main class as prepared
    +    } else {
    +      runMain(childArgs, childClasspath, sysProps, childMainClass)
    +    }
    +  }
    +
    +  /**
    +   * Prepare the environment for submitting an application.
    +   * This returns a 4-tuple:
    +   *   (1) the arguments for the child process,
    +   *   (2) a list of classpath entries for the child,
    +   *   (3) a list of system properties and env vars, and
    +   *   (4) the main class for the child
    +   * In standalone cluster mode, this mutates the original arguments 
passed in.
    --- End diff --
    
    This also mutates the argumnets for python programs, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to