Ngone51 commented on a change in pull request #31348:
URL: https://github.com/apache/spark/pull/31348#discussion_r570351605



##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -200,6 +200,8 @@ private[deploy] object DeployMessages {
 
   case class KillExecutors(appId: String, executorIds: Seq[String])
 
+  case class ExecutorRemoved(appId: String, executorId: String)

Review comment:
       Sounds good!

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
##########
@@ -321,4 +321,15 @@ private[spark] class StandaloneAppClient(
     }
   }
 
+  /**
+   * Notify the Master about the removal of the executor
+   */
+  def executorRemoved(executorId: String): Unit = {
+    if (endpoint.get != null && appId.get != null) {
+      endpoint.get.send(ExecutorRemoved(appId.get, executorId))

Review comment:
       This follows above `killExecutors` & `requestTotalExecutors`. Do you 
think we could improve all of them?

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
##########
@@ -321,4 +321,15 @@ private[spark] class StandaloneAppClient(
     }
   }
 
+  /**
+   * Notify the Master about the removal of the executor
+   */
+  def executorRemoved(executorId: String): Unit = {
+    if (endpoint.get != null && appId.get != null) {
+      endpoint.get.send(ExecutorRemoved(appId.get, executorId))
+    } else {
+      logWarning("Attempted to notify removed executor before driver fully 
initialized.")

Review comment:
       We can add `executorId` but maybe not `appId` since it's not always 
initialized at this time.

##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -200,6 +200,8 @@ private[deploy] object DeployMessages {
 
   case class KillExecutors(appId: String, executorIds: Seq[String])
 
+  case class ExecutorRemoved(appId: String, executorId: String)

Review comment:
       After another look, it seems we can't reuse `ExecutorUpdated` as it 
doesn't contain `appId`. Although we can traverse all executors to find the 
target one using `execId`, but it's kind of time-consuming.

##########
File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
##########
@@ -550,6 +502,55 @@ private[deploy] class Master(
       } else {
         context.reply(0)
       }
+
+    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>

Review comment:
       This change moves the handling of `ExecutorStateChanged` from `receive` 
to `receiveAndReply` and `context.reply(true)` is the only difference.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to