This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new accfb39e4ddf [SPARK-46888][CORE] Fix `Master` to reject 
`/workers/kill/` requests if decommission is disabled
accfb39e4ddf is described below

commit accfb39e4ddf7f7b54396bd0e35256a04461c693
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sat Jan 27 20:24:15 2024 -0800

    [SPARK-46888][CORE] Fix `Master` to reject `/workers/kill/` requests if 
decommission is disabled
    
    This PR aims to fix `Master` to reject `/workers/kill/` request if 
`spark.decommission.enabled` is `false` in order to fix the dangling worker 
issue.
    
    Currently, `spark.decommission.enabled` is `false` by default. So, when a 
user asks to decommission, only Master marked it `DECOMMISSIONED` while the 
worker is alive.
    ```
    $ curl -XPOST http://localhost:8080/workers/kill/\?host\=127.0.0.1
    ```
    
    **Master UI**
    ![Screenshot 2024-01-27 at 6 19 18 
PM](https://github.com/apache/spark/assets/9700541/443bfc32-b924-438a-8bf6-c64b9afbc4be)
    
    **Worker Log**
    ```
    24/01/27 18:18:06 WARN Worker: Receive decommission request, but 
decommission feature is disabled.
    ```
    
    To be consistent with the existing `Worker` behavior which ignores the 
request.
    
    
https://github.com/apache/spark/blob/1787a5261e87e0214a3f803f6534c5e52a0138e6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L859-L868
    
    No, this is a bug fix.
    
    Pass the CI with the newly added test case.
    
    No.
    
    Closes #44915 from dongjoon-hyun/SPARK-46888.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 20b593811dc02c96c71978851e051d32bf8c3496)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/deploy/master/ui/MasterWebUI.scala |  4 +++-
 .../apache/spark/deploy/master/MasterSuite.scala    | 21 +++++++++++++++++++++
 .../spark/deploy/master/ui/MasterWebUISuite.scala   |  3 ++-
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index af94bd6d9e0f..53e5c5ac2a8f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, 
HttpServletResponse}
 import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, 
MasterStateResponse, RequestMasterState}
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
 import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
 import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
 import org.apache.spark.ui.{SparkUI, WebUI}
@@ -40,6 +41,7 @@ class MasterWebUI(
 
   val masterEndpointRef = master.self
   val killEnabled = master.conf.get(UI_KILL_ENABLED)
+  val decommissionDisabled = !master.conf.get(DECOMMISSION_ENABLED)
   val decommissionAllowMode = 
master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE)
 
   initialize()
@@ -58,7 +60,7 @@ class MasterWebUI(
       override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
         val hostnames: Seq[String] = Option(req.getParameterValues("host"))
           .getOrElse(Array[String]()).toSeq
-        if (!isDecommissioningRequestAllowed(req)) {
+        if (decommissionDisabled || !isDecommissioningRequestAllowed(req)) {
           resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
         } else {
           val removedWorkers = masterEndpointRef.askSync[Integer](
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 1cec863b1e7f..37874de98766 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy.master
 
+import java.net.{HttpURLConnection, URL}
 import java.util.Date
 import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
@@ -325,6 +326,26 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
+  test("SPARK-46888: master should reject worker kill request if decommision 
is disabled") {
+    implicit val formats = org.json4s.DefaultFormats
+    val conf = new SparkConf()
+      .set(DECOMMISSION_ENABLED, false)
+      .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW")
+    val localCluster = LocalSparkCluster(1, 1, 512, conf)
+    localCluster.start()
+    val masterUrl = 
s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+    try {
+      eventually(timeout(30.seconds), interval(100.milliseconds)) {
+        val url = new 
URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}")
+        val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+        conn.setRequestMethod("POST")
+        assert(conn.getResponseCode === 405)
+      }
+    } finally {
+      localCluster.stop()
+    }
+  }
+
   test("master/worker web ui available") {
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
index 024511189acc..40265a12af93 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
@@ -30,12 +30,13 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkFunSuite}
 import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, 
KillDriverResponse, RequestKillDriver}
 import org.apache.spark.deploy.DeployTestUtils._
 import org.apache.spark.deploy.master._
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
 import org.apache.spark.util.Utils
 
 class MasterWebUISuite extends SparkFunSuite {
 
-  val conf = new SparkConf()
+  val conf = new SparkConf().set(DECOMMISSION_ENABLED, true)
   val securityMgr = new SecurityManager(conf)
   val rpcEnv = mock(classOf[RpcEnv])
   val master = mock(classOf[Master])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to