[GitHub] [spark] jiangxb1987 commented on a change in pull request #29015: [SPARK-32215] Expose a (protected) /workers/kill endpoint on the MasterWebUI

2020-07-15 Thread GitBox


jiangxb1987 commented on a change in pull request #29015:
URL: https://github.com/apache/spark/pull/29015#discussion_r455517786



##
File path: 
core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
##
@@ -49,6 +55,26 @@ class MasterWebUI(
   "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = 
Set("POST")))
 attachHandler(createRedirectHandler(
   "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = 
Set("POST")))
+attachHandler(createServletHandler("/workers/kill", new HttpServlet {
+  override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+val hostnames: Seq[String] = Option(req.getParameterValues("host"))
+  .getOrElse(Array[String]()).toSeq
+if (!isDecommissioningRequestAllowed(req)) {
+  resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
+} else {
+  val removedWorkers = 
masterEndpointRef.askSync[Integer](DecommissionHosts(hostnames))
+  logInfo(s"Decommissioning of hosts $hostnames decommissioned 
${removedWorkers} workers")

Review comment:
   nit: `${removedWorkers}` -> `$removedWorkers`

##
File path: core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
##
@@ -726,6 +726,61 @@ class MasterSuite extends SparkFunSuite
 }
   }
 
+  def testWorkerDecommissioning(
+  numWorkers: Int,
+  numWorkersExpectedToDecom: Int,
+  hostnames: Seq[String]): Unit = {
+val conf = new SparkConf()
+val master = makeAliveMaster(conf)
+val workerRegs = (1 to numWorkers).map{idx =>
+  val worker = new MockWorker(master.self, conf)
+  worker.rpcEnv.setupEndpoint("worker", worker)
+  val workerReg = RegisterWorker(
+worker.id,
+"localhost",
+worker.self.address.port,
+worker.self,
+10,
+1024,
+"http://localhost:8080;,
+RpcAddress("localhost", 1))
+  master.self.send(workerReg)
+  workerReg
+}
+
+eventually(timeout(10.seconds)) {
+  val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
+  assert(masterState.workers.length === numWorkers)
+  assert(masterState.workers.forall(_.state == WorkerState.ALIVE))
+  assert(masterState.workers.map(_.id).toSet == workerRegs.map(_.id).toSet)
+  masterState.workers

Review comment:
   nit: this is not needed

##
File path: core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
##
@@ -726,6 +726,61 @@ class MasterSuite extends SparkFunSuite
 }
   }
 
+  def testWorkerDecommissioning(
+  numWorkers: Int,
+  numWorkersExpectedToDecom: Int,
+  hostnames: Seq[String]): Unit = {
+val conf = new SparkConf()
+val master = makeAliveMaster(conf)
+val workerRegs = (1 to numWorkers).map{idx =>
+  val worker = new MockWorker(master.self, conf)
+  worker.rpcEnv.setupEndpoint("worker", worker)
+  val workerReg = RegisterWorker(
+worker.id,
+"localhost",
+worker.self.address.port,
+worker.self,
+10,
+1024,
+"http://localhost:8080;,
+RpcAddress("localhost", 1))
+  master.self.send(workerReg)
+  workerReg
+}
+
+eventually(timeout(10.seconds)) {
+  val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
+  assert(masterState.workers.length === numWorkers)
+  assert(masterState.workers.forall(_.state == WorkerState.ALIVE))
+  assert(masterState.workers.map(_.id).toSet == workerRegs.map(_.id).toSet)
+  masterState.workers
+}
+
+val decomWorkersCount = 
master.self.askSync[Integer](DecommissionHosts(hostnames))
+assert(decomWorkersCount === numWorkersExpectedToDecom)
+
+// Decommissioning is actually async ... wait for the workers to actually 
be decommissioned by
+// polling the master's state.
+eventually(timeout(10.seconds)) {

Review comment:
   nit: we may want to give a longer timeout to avoid flakyness.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jiangxb1987 commented on a change in pull request #29015: [SPARK-32215] Expose a (protected) /workers/kill endpoint on the MasterWebUI

2020-07-15 Thread GitBox


jiangxb1987 commented on a change in pull request #29015:
URL: https://github.com/apache/spark/pull/29015#discussion_r455267326



##
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##
@@ -108,6 +108,9 @@ private[deploy] object DeployMessages {
 
   case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends 
DeployMessage
 
+  // Out of band commands to Master
+  case class DecommissionHostPorts(hostPorts: Seq[String])

Review comment:
   let's change to `DecommissionWorkers` then, since `WorkerStateResponse` 
also passes in `host` and `port`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jiangxb1987 commented on a change in pull request #29015: [SPARK-32215] Expose a (protected) /workers/kill endpoint on the MasterWebUI

2020-07-15 Thread GitBox


jiangxb1987 commented on a change in pull request #29015:
URL: https://github.com/apache/spark/pull/29015#discussion_r455252197



##
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##
@@ -108,6 +108,9 @@ private[deploy] object DeployMessages {
 
   case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends 
DeployMessage
 
+  // Out of band commands to Master
+  case class DecommissionHostPorts(hostPorts: Seq[String])

Review comment:
   It would be confusing to say `DecommissionWorkers` but passes in 
sequence of hostPorts...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jiangxb1987 commented on a change in pull request #29015: [SPARK-32215] Expose a (protected) /workers/kill endpoint on the MasterWebUI

2020-07-14 Thread GitBox


jiangxb1987 commented on a change in pull request #29015:
URL: https://github.com/apache/spark/pull/29015#discussion_r454137251



##
File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
##
@@ -863,7 +872,29 @@ private[deploy] class Master(
 true
   }
 
-  private def decommissionWorker(worker: WorkerInfo): Unit = {
+  private def decommissionHostPorts(hostPorts: Seq[String]): Integer = {
+val hostPortsParsed = hostPorts.map(Utils.parseHostPort)
+val (hostPortsWithoutPorts, hostPortsWithPorts) = 
hostPortsParsed.partition(_._2 == 0)

Review comment:
   nit: val (hostPortsWithoutPorts, hostPortsWithPorts) = 
hostPorts.map(Utils.parseHostPort).partition(_._2 == 0)

##
File path: 
core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
##
@@ -49,6 +56,23 @@ class MasterWebUI(
   "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = 
Set("POST")))
 attachHandler(createRedirectHandler(
   "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = 
Set("POST")))
+attachHandler(createServletHandler("/workers/kill", new HttpServlet {
+  override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+val hostPorts: Seq[String] = Option(req.getParameterValues("host"))
+  .getOrElse(Array[String]()).map(_.toLowerCase(Locale.ROOT)).toSeq
+logInfo(s"Received request to decommission workers $hostPorts from 
${req.getRemoteAddr}")
+if (!isDecommissioningRequestAllowed(req)) {
+  resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
+} else {
+  val removedWorkers = 
masterEndpointRef.askSync[Integer](DecommissionHostPorts(hostPorts))
+  if (removedWorkers <= 0) {

Review comment:
   when would the `removedWorkers < 0` happen?

##
File path: 
core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
##
@@ -49,6 +56,23 @@ class MasterWebUI(
   "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = 
Set("POST")))
 attachHandler(createRedirectHandler(
   "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = 
Set("POST")))
+attachHandler(createServletHandler("/workers/kill", new HttpServlet {
+  override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+val hostPorts: Seq[String] = Option(req.getParameterValues("host"))
+  .getOrElse(Array[String]()).map(_.toLowerCase(Locale.ROOT)).toSeq
+logInfo(s"Received request to decommission workers $hostPorts from 
${req.getRemoteAddr}")
+if (!isDecommissioningRequestAllowed(req)) {
+  resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
+} else {
+  val removedWorkers = 
masterEndpointRef.askSync[Integer](DecommissionHostPorts(hostPorts))

Review comment:
   will multiple requests block each other, since we use `askSync` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org