Repository: spark Updated Branches: refs/heads/master f016f5c8f -> ae8a2b149
[SPARK-21176][WEB UI] Use a single ProxyServlet to proxy all workers and applications ## What changes were proposed in this pull request? Currently, each application and each worker creates their own proxy servlet. Each proxy servlet is backed by its own HTTP client and a relatively large number of selector threads. This is excessive but was fixed (to an extent) by https://github.com/apache/spark/pull/18437. However, a single HTTP client (backed by a single selector thread) should be enough to handle all proxy requests. This PR creates a single proxy servlet no matter how many applications and workers there are. ## How was this patch tested? . The unit tests for rewriting proxied locations and headers were updated. I then spun up a 100 node cluster to ensure that proxy'ing worked correctly jiangxb1987 Please let me know if there's anything else I can do to help push this thru. Thanks! Author: Anderson Osagie <osa...@gmail.com> Closes #18499 from aosagie/fix/minimize-proxy-threads. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae8a2b14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae8a2b14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae8a2b14 Branch: refs/heads/master Commit: ae8a2b14966b1dfa10e620bb24ca6560778c20e7 Parents: f016f5c Author: Anderson Osagie <osa...@gmail.com> Authored: Wed Aug 9 14:35:27 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Aug 9 14:35:27 2017 +0800 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 15 ++----- .../spark/deploy/master/ui/MasterWebUI.scala | 21 +++++---- .../scala/org/apache/spark/ui/JettyUtils.scala | 45 +++++++++++--------- .../scala/org/apache/spark/ui/UISuite.scala | 20 ++++----- 4 files changed, 46 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4cc580e..e030cac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -133,6 +133,7 @@ private[deploy] class Master( masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort if (reverseProxy) { masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl) + webUi.addProxy() logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " + s"Applications UIs are available at $masterWebUiUrl") } @@ -769,9 +770,6 @@ private[deploy] class Master( workers += worker idToWorker(worker.id) = worker addressToWorker(workerAddress) = worker - if (reverseProxy) { - webUi.addProxyTargets(worker.id, worker.webUiAddress) - } true } @@ -780,9 +778,7 @@ private[deploy] class Master( worker.setState(WorkerState.DEAD) idToWorker -= worker.id addressToWorker -= worker.endpoint.address - if (reverseProxy) { - webUi.removeProxyTargets(worker.id) - } + for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( @@ -844,9 +840,6 @@ private[deploy] class Master( endpointToApp(app.driver) = app addressToApp(appAddress) = app waitingApps += app - if (reverseProxy) { - webUi.addProxyTargets(app.id, app.desc.appUiUrl) - } } private def finishApplication(app: ApplicationInfo) { @@ -860,9 +853,7 @@ private[deploy] class Master( idToApp -= app.id endpointToApp -= app.driver addressToApp -= app.driver.address - if (reverseProxy) { - webUi.removeProxyTargets(app.id) - } + if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach { a => http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index e42f41b..35b7ddd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,10 +17,7 @@ package org.apache.spark.deploy.master.ui -import scala.collection.mutable.HashMap - -import org.eclipse.jetty.servlet.ServletContextHandler - +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging import org.apache.spark.ui.{SparkUI, WebUI} @@ -38,7 +35,6 @@ class MasterWebUI( val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) - private val proxyHandlers = new HashMap[String, ServletContextHandler] initialize() @@ -54,16 +50,19 @@ class MasterWebUI( "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST"))) } - def addProxyTargets(id: String, target: String): Unit = { - val endTarget = target.stripSuffix("/") - val handler = createProxyHandler("/proxy/" + id, endTarget) + def addProxy(): Unit = { + val handler = createProxyHandler(idToUiAddress) attachHandler(handler) - proxyHandlers(id) = handler } - def removeProxyTargets(id: String): Unit = { - proxyHandlers.remove(id).foreach(detachHandler) + def idToUiAddress(id: String): Option[String] = { + val state = masterEndpointRef.askSync[MasterStateResponse](RequestMasterState) + val maybeWorkerUiAddress = state.workers.find(_.id == id).map(_.webUiAddress) + val maybeAppUiAddress = state.activeApps.find(_.id == id).map(_.desc.appUiUrl) + + maybeWorkerUiAddress.orElse(maybeAppUiAddress) } + } private[master] object MasterWebUI { http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 0fa9671..880cf08 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -194,28 +194,32 @@ private[spark] object JettyUtils extends Logging { } /** Create a handler for proxying request to Workers and Application Drivers */ - def createProxyHandler( - prefix: String, - target: String): ServletContextHandler = { + def createProxyHandler(idToUiAddress: String => Option[String]): ServletContextHandler = { val servlet = new ProxyServlet { override def rewriteTarget(request: HttpServletRequest): String = { - val rewrittenURI = createProxyURI( - prefix, target, request.getRequestURI(), request.getQueryString()) - if (rewrittenURI == null) { - return null - } - if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) { - return null + val path = request.getPathInfo + if (path == null) return null + + val prefixTrailingSlashIndex = path.indexOf('/', 1) + val prefix = if (prefixTrailingSlashIndex == -1) { + path + } else { + path.substring(0, prefixTrailingSlashIndex) } - rewrittenURI.toString() + val id = prefix.drop(1) + + // Query master state for id's corresponding UI address + // If that address exists, turn it into a valid, target URI string or return null + idToUiAddress(id) + .map(createProxyURI(prefix, _, path, request.getQueryString)) + .filter(uri => uri != null && validateDestination(uri.getHost, uri.getPort)) + .map(_.toString) + .orNull } override def newHttpClient(): HttpClient = { // SPARK-21176: Use the Jetty logic to calculate the number of selector threads (#CPUs/2), // but limit it to 8 max. - // Otherwise, it might happen that we exhaust the threadpool since in reverse proxy mode - // a proxy is instantiated for each executor. If the head node has many processors, this - // can quickly add up to an unreasonably high number of threads. val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors() / 2)) new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null) } @@ -226,8 +230,8 @@ private[spark] object JettyUtils extends Logging { headerName: String, headerValue: String): String = { if (headerName.equalsIgnoreCase("location")) { - val newHeader = createProxyLocationHeader( - prefix, headerValue, clientRequest, serverResponse.getRequest().getURI()) + val newHeader = createProxyLocationHeader(headerValue, clientRequest, + serverResponse.getRequest().getURI()) if (newHeader != null) { return newHeader } @@ -239,8 +243,8 @@ private[spark] object JettyUtils extends Logging { val contextHandler = new ServletContextHandler val holder = new ServletHolder(servlet) - contextHandler.setContextPath(prefix) - contextHandler.addServlet(holder, "/") + contextHandler.setContextPath("/proxy") + contextHandler.addServlet(holder, "/*") contextHandler } @@ -438,7 +442,7 @@ private[spark] object JettyUtils extends Logging { val rest = path.substring(prefix.length()) if (!rest.isEmpty()) { - if (!rest.startsWith("/")) { + if (!rest.startsWith("/") && !uri.endsWith("/")) { uri.append("/") } uri.append(rest) @@ -458,14 +462,13 @@ private[spark] object JettyUtils extends Logging { } def createProxyLocationHeader( - prefix: String, headerValue: String, clientRequest: HttpServletRequest, targetUri: URI): String = { val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority() if (headerValue.startsWith(toReplace)) { clientRequest.getScheme() + "://" + clientRequest.getHeader("host") + - prefix + headerValue.substring(toReplace.length()) + clientRequest.getPathInfo() + headerValue.substring(toReplace.length()) } else { null } http://git-wip-us.apache.org/repos/asf/spark/blob/ae8a2b14/core/src/test/scala/org/apache/spark/ui/UISuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 0c3d4ca..0428903 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -200,36 +200,34 @@ class UISuite extends SparkFunSuite { } test("verify proxy rewrittenURI") { - val prefix = "/proxy/worker-id" + val prefix = "/worker-id" val target = "http://localhost:8081" - val path = "/proxy/worker-id/json" + val path = "/worker-id/json" var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null) assert(rewrittenURI.toString() === "http://localhost:8081/json") rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done") assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done") - rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null) + rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id", null) assert(rewrittenURI.toString() === "http://localhost:8081") - rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/test%2F", null) + rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/test%2F", null) assert(rewrittenURI.toString() === "http://localhost:8081/test%2F") - rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/%F0%9F%98%84", null) + rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/%F0%9F%98%84", null) assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84") - rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null) + rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-noid/json", null) assert(rewrittenURI === null) } test("verify rewriting location header for reverse proxy") { val clientRequest = mock(classOf[HttpServletRequest]) var headerValue = "http://localhost:4040/jobs" - val prefix = "/proxy/worker-id" val targetUri = URI.create("http://localhost:4040") when(clientRequest.getScheme()).thenReturn("http") when(clientRequest.getHeader("host")).thenReturn("localhost:8080") - var newHeader = JettyUtils.createProxyLocationHeader( - prefix, headerValue, clientRequest, targetUri) + when(clientRequest.getPathInfo()).thenReturn("/proxy/worker-id") + var newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri) assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs") headerValue = "http://localhost:4041/jobs" - newHeader = JettyUtils.createProxyLocationHeader( - prefix, headerValue, clientRequest, targetUri) + newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri) assert(newHeader === null) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org