Repository: spark
Updated Branches:
  refs/heads/master f016f5c8f -> ae8a2b149


[SPARK-21176][WEB UI] Use a single ProxyServlet to proxy all workers and 
applications

## What changes were proposed in this pull request?

Currently, each application and each worker creates their own proxy servlet. 
Each proxy servlet is backed by its own HTTP client and a relatively large 
number of selector threads. This is excessive but was fixed (to an extent) by 
https://github.com/apache/spark/pull/18437.

However, a single HTTP client (backed by a single selector thread) should be 
enough to handle all proxy requests. This PR creates a single proxy servlet no 
matter how many applications and workers there are.

## How was this patch tested?
.
The unit tests for rewriting proxied locations and headers were updated. I then 
spun up a 100 node cluster to ensure that proxy'ing worked correctly

jiangxb1987 Please let me know if there's anything else I can do to help push 
this thru. Thanks!

Author: Anderson Osagie <osa...@gmail.com>

Closes #18499 from aosagie/fix/minimize-proxy-threads.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae8a2b14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae8a2b14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae8a2b14

Branch: refs/heads/master
Commit: ae8a2b14966b1dfa10e620bb24ca6560778c20e7
Parents: f016f5c
Author: Anderson Osagie <osa...@gmail.com>
Authored: Wed Aug 9 14:35:27 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Aug 9 14:35:27 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 15 ++-----
 .../spark/deploy/master/ui/MasterWebUI.scala    | 21 +++++----
 .../scala/org/apache/spark/ui/JettyUtils.scala  | 45 +++++++++++---------
 .../scala/org/apache/spark/ui/UISuite.scala     | 20 ++++-----
 4 files changed, 46 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4cc580e..e030cac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -133,6 +133,7 @@ private[deploy] class Master(
     masterWebUiUrl = "http://"; + masterPublicAddress + ":" + webUi.boundPort
     if (reverseProxy) {
       masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
+      webUi.addProxy()
       logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and 
" +
        s"Applications UIs are available at $masterWebUiUrl")
     }
@@ -769,9 +770,6 @@ private[deploy] class Master(
     workers += worker
     idToWorker(worker.id) = worker
     addressToWorker(workerAddress) = worker
-    if (reverseProxy) {
-       webUi.addProxyTargets(worker.id, worker.webUiAddress)
-    }
     true
   }
 
@@ -780,9 +778,7 @@ private[deploy] class Master(
     worker.setState(WorkerState.DEAD)
     idToWorker -= worker.id
     addressToWorker -= worker.endpoint.address
-    if (reverseProxy) {
-      webUi.removeProxyTargets(worker.id)
-    }
+
     for (exec <- worker.executors.values) {
       logInfo("Telling app of lost executor: " + exec.id)
       exec.application.driver.send(ExecutorUpdated(
@@ -844,9 +840,6 @@ private[deploy] class Master(
     endpointToApp(app.driver) = app
     addressToApp(appAddress) = app
     waitingApps += app
-    if (reverseProxy) {
-      webUi.addProxyTargets(app.id, app.desc.appUiUrl)
-    }
   }
 
   private def finishApplication(app: ApplicationInfo) {
@@ -860,9 +853,7 @@ private[deploy] class Master(
       idToApp -= app.id
       endpointToApp -= app.driver
       addressToApp -= app.driver.address
-      if (reverseProxy) {
-        webUi.removeProxyTargets(app.id)
-      }
+
       if (completedApps.size >= RETAINED_APPLICATIONS) {
         val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
         completedApps.take(toRemove).foreach { a =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index e42f41b..35b7ddd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,10 +17,7 @@
 
 package org.apache.spark.deploy.master.ui
 
-import scala.collection.mutable.HashMap
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
RequestMasterState}
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{SparkUI, WebUI}
@@ -38,7 +35,6 @@ class MasterWebUI(
 
   val masterEndpointRef = master.self
   val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
-  private val proxyHandlers = new HashMap[String, ServletContextHandler]
 
   initialize()
 
@@ -54,16 +50,19 @@ class MasterWebUI(
       "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = 
Set("POST")))
   }
 
-  def addProxyTargets(id: String, target: String): Unit = {
-    val endTarget = target.stripSuffix("/")
-    val handler = createProxyHandler("/proxy/" + id, endTarget)
+  def addProxy(): Unit = {
+    val handler = createProxyHandler(idToUiAddress)
     attachHandler(handler)
-    proxyHandlers(id) = handler
   }
 
-  def removeProxyTargets(id: String): Unit = {
-    proxyHandlers.remove(id).foreach(detachHandler)
+  def idToUiAddress(id: String): Option[String] = {
+    val state = 
masterEndpointRef.askSync[MasterStateResponse](RequestMasterState)
+    val maybeWorkerUiAddress = state.workers.find(_.id == 
id).map(_.webUiAddress)
+    val maybeAppUiAddress = state.activeApps.find(_.id == 
id).map(_.desc.appUiUrl)
+
+    maybeWorkerUiAddress.orElse(maybeAppUiAddress)
   }
+
 }
 
 private[master] object MasterWebUI {

http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 0fa9671..880cf08 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -194,28 +194,32 @@ private[spark] object JettyUtils extends Logging {
   }
 
   /** Create a handler for proxying request to Workers and Application Drivers 
*/
-  def createProxyHandler(
-      prefix: String,
-      target: String): ServletContextHandler = {
+  def createProxyHandler(idToUiAddress: String => Option[String]): 
ServletContextHandler = {
     val servlet = new ProxyServlet {
       override def rewriteTarget(request: HttpServletRequest): String = {
-        val rewrittenURI = createProxyURI(
-          prefix, target, request.getRequestURI(), request.getQueryString())
-        if (rewrittenURI == null) {
-          return null
-        }
-        if (!validateDestination(rewrittenURI.getHost(), 
rewrittenURI.getPort())) {
-          return null
+        val path = request.getPathInfo
+        if (path == null) return null
+
+        val prefixTrailingSlashIndex = path.indexOf('/', 1)
+        val prefix = if (prefixTrailingSlashIndex == -1) {
+          path
+        } else {
+          path.substring(0, prefixTrailingSlashIndex)
         }
-        rewrittenURI.toString()
+        val id = prefix.drop(1)
+
+        // Query master state for id's corresponding UI address
+        // If that address exists, turn it into a valid, target URI string or 
return null
+        idToUiAddress(id)
+          .map(createProxyURI(prefix, _, path, request.getQueryString))
+          .filter(uri => uri != null && validateDestination(uri.getHost, 
uri.getPort))
+          .map(_.toString)
+          .orNull
       }
 
       override def newHttpClient(): HttpClient = {
         // SPARK-21176: Use the Jetty logic to calculate the number of 
selector threads (#CPUs/2),
         // but limit it to 8 max.
-        // Otherwise, it might happen that we exhaust the threadpool since in 
reverse proxy mode
-        // a proxy is instantiated for each executor. If the head node has 
many processors, this
-        // can quickly add up to an unreasonably high number of threads.
         val numSelectors = math.max(1, math.min(8, 
Runtime.getRuntime().availableProcessors() / 2))
         new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null)
       }
@@ -226,8 +230,8 @@ private[spark] object JettyUtils extends Logging {
           headerName: String,
           headerValue: String): String = {
         if (headerName.equalsIgnoreCase("location")) {
-          val newHeader = createProxyLocationHeader(
-            prefix, headerValue, clientRequest, 
serverResponse.getRequest().getURI())
+          val newHeader = createProxyLocationHeader(headerValue, clientRequest,
+            serverResponse.getRequest().getURI())
           if (newHeader != null) {
             return newHeader
           }
@@ -239,8 +243,8 @@ private[spark] object JettyUtils extends Logging {
 
     val contextHandler = new ServletContextHandler
     val holder = new ServletHolder(servlet)
-    contextHandler.setContextPath(prefix)
-    contextHandler.addServlet(holder, "/")
+    contextHandler.setContextPath("/proxy")
+    contextHandler.addServlet(holder, "/*")
     contextHandler
   }
 
@@ -438,7 +442,7 @@ private[spark] object JettyUtils extends Logging {
     val rest = path.substring(prefix.length())
 
     if (!rest.isEmpty()) {
-      if (!rest.startsWith("/")) {
+      if (!rest.startsWith("/") && !uri.endsWith("/")) {
         uri.append("/")
       }
       uri.append(rest)
@@ -458,14 +462,13 @@ private[spark] object JettyUtils extends Logging {
   }
 
   def createProxyLocationHeader(
-      prefix: String,
       headerValue: String,
       clientRequest: HttpServletRequest,
       targetUri: URI): String = {
     val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
     if (headerValue.startsWith(toReplace)) {
       clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
-          prefix + headerValue.substring(toReplace.length())
+          clientRequest.getPathInfo() + 
headerValue.substring(toReplace.length())
     } else {
       null
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 0c3d4ca..0428903 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -200,36 +200,34 @@ class UISuite extends SparkFunSuite {
   }
 
   test("verify proxy rewrittenURI") {
-    val prefix = "/proxy/worker-id"
+    val prefix = "/worker-id"
     val target = "http://localhost:8081";
-    val path = "/proxy/worker-id/json"
+    val path = "/worker-id/json"
     var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
     assert(rewrittenURI.toString() === "http://localhost:8081/json";)
     rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
     assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done";)
-    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/proxy/worker-id", null)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id", 
null)
     assert(rewrittenURI.toString() === "http://localhost:8081";)
-    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/proxy/worker-id/test%2F", null)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/worker-id/test%2F", null)
     assert(rewrittenURI.toString() === "http://localhost:8081/test%2F";)
-    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/proxy/worker-id/%F0%9F%98%84", null)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/worker-id/%F0%9F%98%84", null)
     assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84";)
-    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/proxy/worker-noid/json", null)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/worker-noid/json", null)
     assert(rewrittenURI === null)
   }
 
   test("verify rewriting location header for reverse proxy") {
     val clientRequest = mock(classOf[HttpServletRequest])
     var headerValue = "http://localhost:4040/jobs";
-    val prefix = "/proxy/worker-id"
     val targetUri = URI.create("http://localhost:4040";)
     when(clientRequest.getScheme()).thenReturn("http")
     when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
-    var newHeader = JettyUtils.createProxyLocationHeader(
-      prefix, headerValue, clientRequest, targetUri)
+    when(clientRequest.getPathInfo()).thenReturn("/proxy/worker-id")
+    var newHeader = JettyUtils.createProxyLocationHeader(headerValue, 
clientRequest, targetUri)
     assert(newHeader.toString() === 
"http://localhost:8080/proxy/worker-id/jobs";)
     headerValue = "http://localhost:4041/jobs";
-    newHeader = JettyUtils.createProxyLocationHeader(
-      prefix, headerValue, clientRequest, targetUri)
+    newHeader = JettyUtils.createProxyLocationHeader(headerValue, 
clientRequest, targetUri)
     assert(newHeader === null)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to