mridulm commented on code in PR #35683:
URL: https://github.com/apache/spark/pull/35683#discussion_r859246354


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -446,6 +446,12 @@ package object config extends Logging {
     .toSequence
     .createWithDefault(Nil)
 
+  // Enable executor decommissioning when node is in decommissioning state
+  private[spark] val YARN_EXECUTOR_DECOMMISSION_ENABLED =
+    ConfigBuilder("spark.yarn.executor.decommission.enabled")
+      .booleanConf
+      .createWithDefault(false)

Review Comment:
   There is `spark.decommission.enabled` - why not use it ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala:
##########
@@ -422,6 +426,33 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def handleNodesInDecommissioningState(allocateResponse: 
AllocateResponse): Unit = {
+    try {
+      // Some of the nodes are put in decommissioning state where RM did 
allocate
+      // resources on those nodes for earlier allocateResource calls, so 
notifying driver
+      // to put those executors in decommissioning state
+      allocateResponse.getUpdatedNodes.asScala.filter(_.getNodeState == 
NodeState.DECOMMISSIONING).
+        foreach(node => 
driverRef.send(DecommissionExecutorsOnHost(getHostAddress(node))))
+    } catch {
+      case e: Exception => logError("Sending Message to Driver to Decommission 
Executors" +
+        "on Decommissioning Nodes failed", e)
+    }
+  }
+
+  private def isYarnExecutorDecommissionEnabled: Boolean = {

Review Comment:
   Make this a field variable (and move it to the top where fields are getting 
initialized)



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala:
##########
@@ -422,6 +426,33 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def handleNodesInDecommissioningState(allocateResponse: 
AllocateResponse): Unit = {
+    try {
+      // Some of the nodes are put in decommissioning state where RM did 
allocate
+      // resources on those nodes for earlier allocateResource calls, so 
notifying driver
+      // to put those executors in decommissioning state
+      allocateResponse.getUpdatedNodes.asScala.filter(_.getNodeState == 
NodeState.DECOMMISSIONING).
+        foreach(node => 
driverRef.send(DecommissionExecutorsOnHost(getHostAddress(node))))

Review Comment:
   This will send `DecommissionExecutorsOnHost` each time in 
`allocateResources` right ?
   If yes, keep track of nodes which we have already updated to driver with in 
a cache and avoid sending duplicate messages.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to