Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5144#discussion_r28452868
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.rest
    +
    +import java.net.InetSocketAddress
    +import javax.servlet.http.{HttpServlet, HttpServletRequest, 
HttpServletResponse}
    +
    +import scala.io.Source
    +import com.fasterxml.jackson.core.JsonProcessingException
    +import org.eclipse.jetty.server.Server
    +import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
    +import org.eclipse.jetty.util.thread.QueuedThreadPool
    +import org.json4s._
    +import org.json4s.jackson.JsonMethods._
    +
    +import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * A server that responds to requests submitted by the 
[[RestSubmissionClient]].
    + *
    + * This server responds with different HTTP codes depending on the 
situation:
    + *   200 OK - Request was processed successfully
    + *   400 BAD REQUEST - Request was malformed, not successfully validated, 
or of unexpected type
    + *   468 UNKNOWN PROTOCOL VERSION - Request specified a protocol this 
server does not understand
    + *   500 INTERNAL SERVER ERROR - Server throws an exception internally 
while processing the request
    + *
    + * The server always includes a JSON representation of the relevant 
[[SubmitRestProtocolResponse]]
    + * in the HTTP body. If an error occurs, however, the server will include 
an [[ErrorResponse]]
    + * instead of the one expected by the client. If the construction of this 
error response itself
    + * fails, the response will consist of an empty body with a response code 
that indicates internal
    + * server error.
    + */
    +private[spark] abstract class RestSubmissionServer(
    +  val host: String,
    +  val requestedPort: Int,
    +  val masterConf: SparkConf) extends Logging {
    +  protected val submitRequestServlet: SubmitRequestServlet
    +  protected val killRequestServlet: KillRequestServlet
    +  protected val statusRequestServlet: StatusRequestServlet
    +
    +  private var _server: Option[Server] = None
    +
    +  // A mapping from URL prefixes to servlets that serve them. Exposed for 
testing.
    +  protected val baseContext = 
s"/${RestSubmissionServer.PROTOCOL_VERSION}/submissions"
    +  protected lazy val contextToServlet = Map[String, RestServlet](
    +    s"$baseContext/create/*" -> submitRequestServlet,
    +    s"$baseContext/kill/*" -> killRequestServlet,
    +    s"$baseContext/status/*" -> statusRequestServlet,
    +    "/*" -> new ErrorServlet // default handler
    +  )
    +
    +  /** Start the server and return the bound port. */
    +  def start(): Int = {
    +    val (server, boundPort) = 
Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
    +    _server = Some(server)
    +    logInfo(s"Started REST server for submitting applications on port 
$boundPort")
    +    boundPort
    +  }
    +
    +  /**
    +   * Map the servlets to their corresponding contexts and attach them to a 
server.
    +   * Return a 2-tuple of the started server and the bound port.
    +   */
    +  private def doStart(startPort: Int): (Server, Int) = {
    +    val server = new Server(new InetSocketAddress(host, startPort))
    +    val threadPool = new QueuedThreadPool
    +    threadPool.setDaemon(true)
    +    server.setThreadPool(threadPool)
    +    val mainHandler = new ServletContextHandler
    +    mainHandler.setContextPath("/")
    +    contextToServlet.foreach { case (prefix, servlet) =>
    +      mainHandler.addServlet(new ServletHolder(servlet), prefix)
    +    }
    +    server.setHandler(mainHandler)
    +    server.start()
    +    val boundPort = server.getConnectors()(0).getLocalPort
    +    (server, boundPort)
    +  }
    +
    +  def stop(): Unit = {
    +    _server.foreach(_.stop())
    +  }
    +}
    +
    +private[rest] object RestSubmissionServer {
    +  val PROTOCOL_VERSION = RestSubmissionClient.PROTOCOL_VERSION
    +  val SC_UNKNOWN_PROTOCOL_VERSION = 468
    +}
    +
    +/**
    + * An abstract servlet for handling requests passed to the 
[[RestSubmissionServer]].
    + */
    +private[rest] abstract class RestServlet extends HttpServlet with Logging {
    +
    +  /**
    +   * Serialize the given response message to JSON and send it through the 
response servlet.
    +   * This validates the response before sending it to ensure it is 
properly constructed.
    +   */
    +  protected def sendResponse(
    +      responseMessage: SubmitRestProtocolResponse,
    +      responseServlet: HttpServletResponse): Unit = {
    +    val message = validateResponse(responseMessage, responseServlet)
    +    responseServlet.setContentType("application/json")
    +    responseServlet.setCharacterEncoding("utf-8")
    +    responseServlet.getWriter.write(message.toJson)
    +  }
    +
    +  /**
    +   * Return any fields in the client request message that the server does 
not know about.
    +   *
    +   * The mechanism for this is to reconstruct the JSON on the server side 
and compare the
    +   * diff between this JSON and the one generated on the client side. Any 
fields that are
    +   * only in the client JSON are treated as unexpected.
    +   */
    +  protected def findUnknownFields(
    +      requestJson: String,
    +      requestMessage: SubmitRestProtocolMessage): Array[String] = {
    +    val clientSideJson = parse(requestJson)
    +    val serverSideJson = parse(requestMessage.toJson)
    +    val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
    +    unknown match {
    +      case j: JObject => j.obj.map { case (k, _) => k }.toArray
    +      case _ => Array.empty[String] // No difference
    +    }
    +  }
    +
    +  /** Return a human readable String representation of the exception. */
    +  protected def formatException(e: Throwable): String = {
    +    val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
    +    s"$e\n$stackTraceString"
    +  }
    +
    +  /** Construct an error message to signal the fact that an exception has 
been thrown. */
    +  protected def handleError(message: String): ErrorResponse = {
    +    val e = new ErrorResponse
    +    e.serverSparkVersion = sparkVersion
    +    e.message = message
    +    e
    +  }
    +
    +  /**
    +   * Parse a submission ID from the relative path, assuming it is the 
first part of the path.
    +   * For instance, we expect the path to take the form /[submission 
ID]/maybe/something/else.
    +   * The returned submission ID cannot be empty. If the path is 
unexpected, return None.
    +   */
    +  protected def parseSubmissionId(path: String): Option[String] = {
    +    if (path == null || path.isEmpty) {
    +      None
    +    } else {
    +      path.stripPrefix("/").split("/").headOption.filter(_.nonEmpty)
    +    }
    +  }
    +
    +  /**
    +   * Validate the response to ensure that it is correctly constructed.
    +   *
    +   * If it is, simply return the message as is. Otherwise, return an error 
response instead
    +   * to propagate the exception back to the client and set the appropriate 
error code.
    +   */
    +  private def validateResponse(
    +      responseMessage: SubmitRestProtocolResponse,
    +      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
    +    try {
    +      responseMessage.validate()
    +      responseMessage
    +    } catch {
    +      case e: Exception =>
    +        
responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
    +        handleError("Internal server error: " + formatException(e))
    +    }
    +  }
    +}
    +
    +/**
    + * A servlet for handling kill requests passed to the 
[[RestSubmissionServer]].
    + */
    +private[rest] abstract class KillRequestServlet
    +  extends RestServlet {
    --- End diff --
    
    bump up 1 line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to