Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5116#discussion_r28473266
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala ---
    @@ -63,45 +71,109 @@ private[deploy] class StandaloneRestClient extends 
Logging {
        * it to the user. Otherwise, report the error message provided by the 
server.
        */
       private[rest] def createSubmission(
    -      master: String,
           request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
         logInfo(s"Submitting a request to launch an application in $master.")
    -    validateMaster(master)
    -    val url = getSubmitUrl(master)
    -    val response = postJson(url, request.toJson)
    -    response match {
    -      case s: CreateSubmissionResponse =>
    -        reportSubmissionStatus(master, s)
    -        handleRestResponse(s)
    -      case unexpected =>
    -        handleUnexpectedRestResponse(unexpected)
    +    var suc: Boolean = false
    +    var response: SubmitRestProtocolResponse = null
    +    for (m <- masters if !suc) {
    +      validateMaster(m)
    +      val url = getSubmitUrl(m)
    +      try {
    +        response = postJson(url, request.toJson)
    +        response match {
    +          case s: CreateSubmissionResponse =>
    +            if (s.success) {
    +              reportSubmissionStatus(s)
    +              handleRestResponse(s)
    +              suc = true
    +            }
    +          case unexpected =>
    +            handleUnexpectedRestResponse(unexpected)
    +        }
    +      } catch {
    +        case unreachable @ (_: FileNotFoundException | _: SocketException 
| _: ConnectException) =>
    +          if(handleConnectionException(m)) {
    +            throw new SubmitRestConnectionException(
    +              s"Unable to connect to server", unreachable)
    +          }
    +        case malformed @ (_: JsonProcessingException | _: 
SubmitRestProtocolException) =>
    +          if(handleConnectionException(m)) {
    +            throw new SubmitRestProtocolException(
    +              "Malformed response received from server", malformed)
    +          }
    +      }
         }
         response
       }
     
       /** Request that the server kill the specified submission. */
    -  def killSubmission(master: String, submissionId: String): 
SubmitRestProtocolResponse = {
    +  def killSubmission(submissionId: String): SubmitRestProtocolResponse = {
         logInfo(s"Submitting a request to kill submission $submissionId in 
$master.")
    -    validateMaster(master)
    -    val response = post(getKillUrl(master, submissionId))
    -    response match {
    -      case k: KillSubmissionResponse => handleRestResponse(k)
    -      case unexpected => handleUnexpectedRestResponse(unexpected)
    +    var suc: Boolean = false
    +    var response: SubmitRestProtocolResponse = null
    +    for (m <- masters if !suc) {
    +      validateMaster(m)
    +      val url = getKillUrl(m, submissionId)
    +      try {
    +        response = post(url)
    +        response match {
    +          case k: KillSubmissionResponse =>
    +            if (!k.message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) {
    +              handleRestResponse(k)
    +              suc = true
    +            }
    +          case unexpected =>
    +            handleUnexpectedRestResponse(unexpected)
    +        }
    +      } catch {
    +        case unreachable @ (_: FileNotFoundException | _: SocketException 
| _: ConnectException) =>
    +          if(handleConnectionException(m)) {
    +            throw new SubmitRestConnectionException(
    +              s"Unable to connect to server", unreachable)
    +          }
    +        case malformed @ (_: JsonProcessingException | _: 
SubmitRestProtocolException) =>
    +          if(handleConnectionException(m)) {
    +            throw new SubmitRestProtocolException(
    +              "Malformed response received from server", malformed)
    +          }
    +      }
         }
         response
       }
     
       /** Request the status of a submission from the server. */
       def requestSubmissionStatus(
    -      master: String,
           submissionId: String,
           quiet: Boolean = false): SubmitRestProtocolResponse = {
         logInfo(s"Submitting a request for the status of submission 
$submissionId in $master.")
    -    validateMaster(master)
    -    val response = get(getStatusUrl(master, submissionId))
    -    response match {
    -      case s: SubmissionStatusResponse => if (!quiet) { 
handleRestResponse(s) }
    -      case unexpected => handleUnexpectedRestResponse(unexpected)
    +    var suc: Boolean = false
    +    var response: SubmitRestProtocolResponse = null
    +    for (m <- masters) {
    +      validateMaster(m)
    +      val url = getStatusUrl(m, submissionId)
    +      try {
    +        response = get(url)
    +        response match {
    +          case s: SubmissionStatusResponse if s.success =>
    +            if (!quiet) {
    +              handleRestResponse(s)
    +            }
    +            suc = true
    +          case unexpected =>
    +            handleUnexpectedRestResponse(unexpected)
    +        }
    +      } catch {
    +        case unreachable @ (_: FileNotFoundException | _: SocketException 
| _: ConnectException) =>
    +          if(handleConnectionException(m)) {
    +            throw new SubmitRestConnectionException(
    +              s"Unable to connect to server", unreachable)
    +          }
    +        case malformed @ (_: JsonProcessingException | _: 
SubmitRestProtocolException) =>
    +          if(handleConnectionException(m)) {
    --- End diff --
    
    This is not a connection exception. We should just throw the exception here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to