Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/5144#discussion_r28452988
--- Diff:
core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala ---
@@ -54,175 +45,29 @@ import org.apache.spark.deploy.ClientArguments._
*
* @param host the address this server should bind to
* @param requestedPort the port this server will attempt to bind to
+ * @param masterConf the conf used by the Master
* @param masterActor reference to the Master actor to which requests can
be sent
* @param masterUrl the URL of the Master new drivers will attempt to
connect to
- * @param masterConf the conf used by the Master
*/
private[deploy] class StandaloneRestServer(
host: String,
requestedPort: Int,
+ masterConf: SparkConf,
masterActor: ActorRef,
- masterUrl: String,
- masterConf: SparkConf)
- extends Logging {
-
- import StandaloneRestServer._
-
- private var _server: Option[Server] = None
-
- // A mapping from URL prefixes to servlets that serve them. Exposed for
testing.
- protected val baseContext = s"/$PROTOCOL_VERSION/submissions"
- protected val contextToServlet = Map[String, StandaloneRestServlet](
- s"$baseContext/create/*" -> new SubmitRequestServlet(masterActor,
masterUrl, masterConf),
- s"$baseContext/kill/*" -> new KillRequestServlet(masterActor,
masterConf),
- s"$baseContext/status/*" -> new StatusRequestServlet(masterActor,
masterConf),
- "/*" -> new ErrorServlet // default handler
- )
-
- /** Start the server and return the bound port. */
- def start(): Int = {
- val (server, boundPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
- _server = Some(server)
- logInfo(s"Started REST server for submitting applications on port
$boundPort")
- boundPort
- }
-
- /**
- * Map the servlets to their corresponding contexts and attach them to a
server.
- * Return a 2-tuple of the started server and the bound port.
- */
- private def doStart(startPort: Int): (Server, Int) = {
- val server = new Server(new InetSocketAddress(host, startPort))
- val threadPool = new QueuedThreadPool
- threadPool.setDaemon(true)
- server.setThreadPool(threadPool)
- val mainHandler = new ServletContextHandler
- mainHandler.setContextPath("/")
- contextToServlet.foreach { case (prefix, servlet) =>
- mainHandler.addServlet(new ServletHolder(servlet), prefix)
- }
- server.setHandler(mainHandler)
- server.start()
- val boundPort = server.getConnectors()(0).getLocalPort
- (server, boundPort)
- }
-
- def stop(): Unit = {
- _server.foreach(_.stop())
- }
-}
-
-private[rest] object StandaloneRestServer {
- val PROTOCOL_VERSION = StandaloneRestClient.PROTOCOL_VERSION
- val SC_UNKNOWN_PROTOCOL_VERSION = 468
-}
-
-/**
- * An abstract servlet for handling requests passed to the
[[StandaloneRestServer]].
- */
-private[rest] abstract class StandaloneRestServlet extends HttpServlet
with Logging {
-
- /**
- * Serialize the given response message to JSON and send it through the
response servlet.
- * This validates the response before sending it to ensure it is
properly constructed.
- */
- protected def sendResponse(
- responseMessage: SubmitRestProtocolResponse,
- responseServlet: HttpServletResponse): Unit = {
- val message = validateResponse(responseMessage, responseServlet)
- responseServlet.setContentType("application/json")
- responseServlet.setCharacterEncoding("utf-8")
- responseServlet.getWriter.write(message.toJson)
- }
-
- /**
- * Return any fields in the client request message that the server does
not know about.
- *
- * The mechanism for this is to reconstruct the JSON on the server side
and compare the
- * diff between this JSON and the one generated on the client side. Any
fields that are
- * only in the client JSON are treated as unexpected.
- */
- protected def findUnknownFields(
- requestJson: String,
- requestMessage: SubmitRestProtocolMessage): Array[String] = {
- val clientSideJson = parse(requestJson)
- val serverSideJson = parse(requestMessage.toJson)
- val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
- unknown match {
- case j: JObject => j.obj.map { case (k, _) => k }.toArray
- case _ => Array.empty[String] // No difference
- }
- }
-
- /** Return a human readable String representation of the exception. */
- protected def formatException(e: Throwable): String = {
- val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
- s"$e\n$stackTraceString"
- }
-
- /** Construct an error message to signal the fact that an exception has
been thrown. */
- protected def handleError(message: String): ErrorResponse = {
- val e = new ErrorResponse
- e.serverSparkVersion = sparkVersion
- e.message = message
- e
- }
-
- /**
- * Parse a submission ID from the relative path, assuming it is the
first part of the path.
- * For instance, we expect the path to take the form /[submission
ID]/maybe/something/else.
- * The returned submission ID cannot be empty. If the path is
unexpected, return None.
- */
- protected def parseSubmissionId(path: String): Option[String] = {
- if (path == null || path.isEmpty) {
- None
- } else {
- path.stripPrefix("/").split("/").headOption.filter(_.nonEmpty)
- }
- }
-
- /**
- * Validate the response to ensure that it is correctly constructed.
- *
- * If it is, simply return the message as is. Otherwise, return an error
response instead
- * to propagate the exception back to the client and set the appropriate
error code.
- */
- private def validateResponse(
- responseMessage: SubmitRestProtocolResponse,
- responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
- try {
- responseMessage.validate()
- responseMessage
- } catch {
- case e: Exception =>
-
responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
- handleError("Internal server error: " + formatException(e))
- }
- }
+ masterUrl: String)
+ extends RestSubmissionServer(host, requestedPort, masterConf) {
+ val submitRequestServlet = new
StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf)
--- End diff --
new line before this. Also these should be `protected override val`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]