SaurabhChawla100 opened a new pull request #27636: 
[WIP][SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster 
manger in Spark
URL: https://github.com/apache/spark/pull/27636
 
 
   ### What changes were proposed in this pull request?
   In many public cloud environments, the node loss (in case of AWS 
SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
activity. 
   The cloud provider intimates the cluster manager about the possible loss of 
node ahead of time. Few examples is listed here:
   a) Spot loss in AWS(2 min before event)
   b) GCP Pre-emptible VM loss (30 second before event)
   c) AWS Spot block loss with info on termination time (generally few tens of 
minutes before decommission as configured in Yarn)
   
   This PR tries to make spark leverage the knowledge of the node loss in 
future, and tries to adjust the scheduling of tasks to minimise the impact on 
the application. 
   It is well known that when a host is lost, the executors, its running tasks, 
their caches and also Shuffle data is lost. This could result in wastage of 
compute and other resources.
   
   The focus here is to build a framework for YARN, that can be extended for 
other cluster managers to handle such scenario.
   
   The framework must handle one or more of the following:-
   1) Prevent new tasks from starting on any executors on decommissioning 
Nodes. 
   2) Decide to kill the running tasks so that they can be restarted elsewhere 
(assuming they will not complete within the deadline) OR we can allow them to 
continue hoping they will  finish within deadline.
   3) Clear the shuffle data entry from MapOutputTracker of decommission node 
hostname to prevent the shuffle fetchfailed exception.The most significant 
advantage of unregistering shuffle outputs when Spark schedules the first 
re-attempt to compute the missing blocks, it notices all of the missing blocks 
from decommissioned nodes and recovers in only one attempt. This speeds up the 
recovery process significantly over the scheduled Spark implementation, where 
stages might be rescheduled multiple times to recompute missing shuffles from 
all nodes, and prevent jobs from being stuck for hours failing and recomputing.
   4) Prevent the stage to abort due to the fetchfailed exception in case of 
decommissioning of node. In Spark there is number of consecutive stage attempts 
allowed before a stage is aborted.This is controlled by the config 
spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due 
decommissioning of nodes towards stage failure improves the reliability of the 
system.
   
   Main components of change
   1) Get the ClusterInfo update from the Resource Manager -> Application 
Master -> Spark Driver.
   2) DecommissionTracker, resides inside driver, tracks all the decommissioned 
nodes and take necessary action and state transition.
   3) Based on the decommission node list add hooks at code to achieve
     a) No new task on executor
     b) Remove shuffle data mapping info for the node to be decommissioned from 
the mapOutputTracker
     c) Do not count fetchFailure from decommissioned towards stage failure
   
   On the receiving info that node is to be decommissioned, the below action 
needs to be performed by DecommissionTracker on driver:
    * Add the entry of Nodes in DecommissionTracker with termination time and 
node state as "DECOMMISSIONING".
    * Stop assigning any new tasks on executors on the nodes which are 
candidate for decommission. This makes sure slowly as the tasks finish the 
usage of this node would  die down.
    *  Kill all the executors for the decommissioning nodes after configurable 
period of time, say "spark.graceful.decommission.executor.leasetimePct". This 
killing ensures two things. Firstly, the task failure will be attributed in job 
failure count. Second, avoid generation on more shuffle data on the node that 
will eventually be lost.  The node state is set to "EXECUTOR_DECOMMISSIONED". 
    * Mark Shuffle data on the node as unavailable after 
"spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will 
ensure that recomputation of missing shuffle partition is done early, rather 
than reducers failing with a time-consuming FetchFailure. The node state is set 
to "SHUFFLE_DECOMMISSIONED". 
    * Mark Node as Terminated after the termination time. Now the state of the 
node is "TERMINATED".
    * Remove the node entry from Decommission Tracker if the same host name is 
reused.(This is not uncommon in many public cloud environments). 
   
   ### Why are the changes needed?
   Add the support to handle the Node Decommissioning for Yarn cluster manger 
in Spark
   
   ### Does this PR introduce any user-facing change?
   NO
   
   
   ### How was this patch tested?
   Added the Unit Test 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to