SaurabhChawla100 commented on a change in pull request #27636: 
[SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger 
in Spark
URL: https://github.com/apache/spark/pull/27636#discussion_r386329460
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##########
 @@ -1542,4 +1542,50 @@ package object config {
     .bytesConf(ByteUnit.BYTE)
     .createOptional
 
+  private[spark] val GRACEFUL_DECOMMISSION_ENABLE =
+    ConfigBuilder("spark.graceful.decommission.enable")
+      .doc("Whether to enable the node graceful decommissioning handling")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD =
+    ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold")
+      .doc("Threshold of number of times fetchfailed ignored due to node" +
+        " decommission.This is configurable as per the need of the user and" +
+        " depending upon type of the cloud. If we keep this a large value and 
" +
+        " there is continuous decommission of nodes, in those scenarios stage" 
+
+        " will never abort and keeps on retrying in an unbounded manner.")
+      .intConf
+      .createWithDefault(8)
+
+  private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT =
+    ConfigBuilder("spark.graceful.decommission.executor.leasetimePct")
+      .doc("Percentage of time to expiry after which executors are killed " +
+        "(if enabled) on the node. Value ranges between (0-100)")
+      .intConf
+      .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.")
+      .createWithDefault(50) // Pulled out of thin air.
+
+  private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT =
+    ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct")
+      .doc("Percentage of time to expiry after which shuffle data " +
+        "cleaned up (if enabled) on the node. Value ranges between (0-100)")
+      .intConf
+      .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.")
+      .createWithDefault(90) // Pulled out of thin air.
+
+  private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC =
+    ConfigBuilder("spark.graceful.decommission.min.termination.time")
+      .doc("Minimum time to termination below which node decommissioning is 
performed immediately")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("60s")
+
+  private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT =
+    ConfigBuilder("spark.graceful.decommission.node.timeout")
 
 Review comment:
   We can get the decommission timeout for hadoop-3.1 and later version of 
hadoop, so we can use that value to decide when the node is decommissioned.
   Whereas for lower version of hadoop(hadoop-2.8) there is no 
decommissionTimeout  for decommissioning nodes in those scenario we already 
knew from our experience that in AWS  spotloss nodes will stay for 2 min and 
GCP preemptible VM will stay for 30 sec after receiving the node 
decommissioning from hadoop end. 
   
   This config is added here to make decommissioning of nodes to work with 
multiple version of hadoop, 
   
    Please find the logic used in YarnAllocator.scala to decide the timeout of 
the node
   
   ```
   if (x.getNodeState.toString.equals(NodeState.DECOMMISSIONING.toString)) {
             // In hadoop 2.7 there is no support getDecommissioningTimeout 
whereas
             // In hadoop 3.1 and later version of hadoop there is support
             // of getDecommissioningTimeout So the method call made using 
reflection
             // to update the value nodeTerminationTime and for lower version 
of hadoop2.7
             // use the config spark.graceful.decommission.node.timeout which 
is specific to cloud
             var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval 
* 1000
             try {
                 val decommiossioningTimeout = x.getClass.getMethod(
                   "getDecommissioningTimeout").invoke(x).asInstanceOf[Integer]
                 if (decommiossioningTimeout != null) {
                   nodeTerminationTime = clock.getTimeMillis() + 
decommiossioningTimeout * 1000
                 }
             } catch {
               case e: NoSuchMethodException => logDebug(e.toString)
             }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to