This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f75c7a7b5240 [SPARK-46883][CORE] Support `/json/clusterutilization` API f75c7a7b5240 is described below commit f75c7a7b52402e4c8faa39b2f88623e9f0bca916 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Sat Jan 27 09:21:17 2024 -0800 [SPARK-46883][CORE] Support `/json/clusterutilization` API ### What changes were proposed in this pull request? This PR aims to support new `/json/clusterutilization` API in `Master` JSON endpoint ### Why are the changes needed? The user can get CPU/Memory/Waiting apps in a single API call. ``` # Start Spark Cluster and Spark Shell $ sbin/start-master.sh $ sbin/start-worker.sh spark://$(hostname):7077; $ bin/spark-shell --master spark://$(hostname):7077 # Check `Cluster Utilization API` $ curl http://localhost:8080/json/clusterutilization { "waitingDrivers" : 0, "cores" : 10, "coresused" : 10, "coresutilization" : 100, "memory" : 31744, "memoryused" : 1024, "memoryutilization" : 3 } ``` ### Does this PR introduce _any_ user-facing change? No. This is a newly added API. ### How was this patch tested? Pass the CIs with the newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44908 from dongjoon-hyun/SPARK-46883. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/deploy/JsonProtocol.scala | 18 ++++++++++++++++++ .../apache/spark/deploy/master/ui/MasterPage.scala | 2 ++ .../org/apache/spark/deploy/JsonProtocolSuite.scala | 21 +++++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 8c356081b277..9c73e84f4166 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -299,4 +299,22 @@ private[deploy] object JsonProtocol { ("executors" -> obj.executors.map(writeExecutorRunner)) ~ ("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner)) } + + /** + * Export the cluster utilization based on the [[MasterStateResponse]] to a Json object. + */ + def writeClusterUtilization(obj: MasterStateResponse): JObject = { + val aliveWorkers = obj.workers.filter(_.isAlive()) + val cores = aliveWorkers.map(_.cores).sum + val coresUsed = aliveWorkers.map(_.coresUsed).sum + val memory = aliveWorkers.map(_.memory).sum + val memoryUsed = aliveWorkers.map(_.memoryUsed).sum + ("waitingDrivers" -> obj.activeDrivers.count(_.state == DriverState.SUBMITTED)) ~ + ("cores" -> cores) ~ + ("coresused" -> coresUsed) ~ + ("coresutilization" -> 100 * coresUsed / cores) ~ + ("memory" -> memory) ~ + ("memoryused" -> memoryUsed) ~ + ("memoryutilization" -> 100 * memoryUsed / memory) + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 36a79e060f01..cbeda23013ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -41,6 +41,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { override def renderJson(request: HttpServletRequest): JValue = { jsonFieldPattern.findFirstMatchIn(request.getRequestURI()) match { case None => JsonProtocol.writeMasterState(getMasterState) + case Some(m) if m.group(1) == "clusterutilization" => + JsonProtocol.writeClusterUtilization(getMasterState) case Some(m) => JsonProtocol.writeMasterState(getMasterState, Some(m.group(1))) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 4a6ace6facde..6fca31234ee2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -105,6 +105,20 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils { assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr)) } + test("SPARK-46883: writeClusterUtilization") { + val workers = Array(createWorkerInfo(), createWorkerInfo()) + val activeApps = Array(createAppInfo()) + val completedApps = Array.empty[ApplicationInfo] + val activeDrivers = Array(createDriverInfo()) + val completedDrivers = Array(createDriverInfo()) + val stateResponse = new MasterStateResponse( + "host", 8080, None, workers, activeApps, completedApps, + activeDrivers, completedDrivers, RecoveryState.ALIVE) + val output = JsonProtocol.writeClusterUtilization(stateResponse) + assertValidJson(output) + assertValidDataInJson(output, JsonMethods.parse(JsonConstants.clusterUtilizationJsonStr)) + } + def assertValidJson(json: JValue): Unit = { try { JsonMethods.parse(JsonMethods.compact(json)) @@ -206,4 +220,11 @@ object JsonConstants { |"executors":[], |"finishedexecutors":[%s,%s]} """.format(executorRunnerJsonStr, executorRunnerJsonStr).stripMargin + + val clusterUtilizationJsonStr = + """ + |{"waitingDrivers":1, + |"cores":8,"coresused":0,"coresutilization":0, + |"memory":2468,"memoryused":0,"memoryutilization":0} + """.stripMargin } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org