Bikas Saha commented on YARN-569:

Sorry for the delayed response.

This doesnt seem to affect the fair scheduler or does it? If not, then it can 
be misleading for users.
+  public static final String RM_SCHEDULER_ENABLE_PREEMPTION =

Missing default?
+  /** List of ScheduleEditPolicy classes affecting scheduler preemption. */
+  public static final String RM_SCHEDULER_PREEMPTION_POLICIES =
+    RM_PREFIX + "scheduler.preemption.policies";

Why cast when one has generic T?
+    public RMContainerPreemptEventDispatcher(ResourceScheduler scheduler) {
+      this.scheduler = (T) scheduler;
+    }

How do we envisage multiple policies working together without stepping on each 
other? Better off limiting to 1?
+        for (ScheduleEditPolicy policy : policies) {
+          LOG.info("LOADING ScheduleEditPolicy:" + policy.toString());
+          policy.init(conf, this.rmContext.getDispatcher().getEventHandler(),
+              (PreemptableResourceScheduler) scheduler);
+          // preemption service, periodically check whether we need to
+          // preempt to guarantee capacity constraints
+          ScheduleMonitor mon = new ScheduleMonitor(policy);
+          addService(mon);
+        }

Might be a personal choice but ScheduleMonitor or ScheduleEditPolicy would 
sound better if they used Scheduling instead of Schedule.

Why would we want to get this from the policy (which seems natural) as well as 
be able to set it. If it needs to be configurable then it can be done via the 
policy config right?
+  protected void setMonitorInterval(int monitorInterval) {
+    this.monitorInterval = monitorInterval;
+  }

Having multiple threads named "Preemption Checker" will probably not help 

Not joining the thread to make sure its cleaned up?
+  public void stop() {
+    stopped = true;
+    if (checkerThread != null) {
+      checkerThread.interrupt();
+    }

Nothing else other than this seems to be synchronized. Then why this?
+  private class PreepmtionChecker implements Runnable {
+    @Override
+    public void run() {+      while (!stopped && 
!Thread.currentThread().isInterrupted()) {
+        synchronized (ScheduleMonitor.this) {

Couldnt quite grok this. What is delta? What is 0.5? A percentage? Whats the 
math behind the calculation? Should it be "even absent preemption" instead of 
"even absent natural termination"? Is this applied before or after 
+  /**
+   * Given a computed preemption target, account for containers naturally
+   * expiring and preempt only this percentage of the delta. This determines
+   * the rate of geometric convergence into the deadzone ({@link
+   * #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
+   * will reclaim almost 95% of resources within 5 * {@link
+   * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
+  public static final String NATURAL_TERMINATION_FACTOR =

In which config file do these above configurations go when defined by the 
admin? Shouldnt they be defined in the config defaults of that file? e.g. 
capacity-scheduler.xml? If they get it from the scheduler config then we 
probably shouldnt pass it a configuration object during init.

RMContainer already has the ApplicationAttemptId inside it. No need for extra 
+  void preemptContainer(ApplicationAttemptId aid, RMContainer container);

Why no lock here when the other new methods have a lock? Do we not care that 
the app remains in applications during the duration of the operations?
+  @Override
+  @Lock(Lock.NoLock.class)
+  public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) {
+    if(LOG.isDebugEnabled()){
+      LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + 
+          " container: " + cont.toString());
+    }
+    FiCaSchedulerApp app = applications.get(aid);
+    if (app != null) {
+      app.addPreemptContainer(cont.getContainerId());
+    }
+  }

+  // need to access the list of apps from the preemption monitor
+  public Set<FiCaSchedulerApp> getApplications() {
+    return activeApplications;
+  }

+  private final Set<ContainerId> containerToPreempt =

There is one critical difference between old and new behavior. The new code 
will not send the finish event to the container if its not part of the 
liveContainers. This probably is wrong. Secondly, the parent/queue metrics etc 
are not updated also. I am not sure if this book-keeping is actually designed 
to be in sync with liveContainers - which is what the new code enforces it to 
be. Same comment for the hierarchical callers of this method who now use the 
new boolean return value of this method to do similar book-keeping.
-  synchronized public void containerCompleted(RMContainer rmContainer,
+  synchronized public boolean containerCompleted(RMContainer rmContainer,

This checks is principally to handle completed containers right? Any chance the 
preemption is sent to a container that is still waiting to be added to 
+  public synchronized void addPreemptContainer(ContainerId cont){
+    // ignore already completed containers
+    if (liveContainers.containsKey(cont)) {
+      containerToPreempt.add(cont);
+    }
+  }

We should probably create and use an invalid priority value instead of using 
the highest priority value.
+    ResourceRequest rr = ResourceRequest.newInstance(
+        Priority.newInstance(0), ResourceRequest.ANY,
+        minimumAllocation, numCont);

FicaSchedulerNode.unreserveResource(). Checks have been added for the reserved 
container but will the code reach that point if there was no reservation 
actually left on that node? In the same vein, can it happen that the node has a 
new reservation that was made out of band of the preemption logic cycle. Hence, 
the reserved container on the node would exist but could be from a different 

Havent looked deeply at the logic of ProportionalCapacityPreemptionPolicy. The 
overall flow looks ok and the code can be turned off /patched separately. Will 
get to it soon.
> CapacityScheduler: support for preemption (using a capacity monitor)
> --------------------------------------------------------------------
>                 Key: YARN-569
>                 URL: https://issues.apache.org/jira/browse/YARN-569
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: capacityscheduler
>            Reporter: Carlo Curino
>            Assignee: Carlo Curino
>         Attachments: 3queues.pdf, CapScheduler_with_preemption.pdf, 
> preemption.2.patch, YARN-569.1.patch, YARN-569.2.patch, YARN-569.3.patch, 
> YARN-569.4.patch, YARN-569.patch, YARN-569.patch
> There is a tension between the fast-pace reactive role of the 
> CapacityScheduler, which needs to respond quickly to 
> applications resource requests, and node updates, and the more introspective, 
> time-based considerations 
> needed to observe and correct for capacity balance. To this purpose we opted 
> instead of hacking the delicate
> mechanisms of the CapacityScheduler directly to add support for preemption by 
> means of a "Capacity Monitor",
> which can be run optionally as a separate service (much like the 
> NMLivelinessMonitor).
> The capacity monitor (similarly to equivalent functionalities in the fairness 
> scheduler) operates running on intervals 
> (e.g., every 3 seconds), observe the state of the assignment of resources to 
> queues from the capacity scheduler, 
> performs off-line computation to determine if preemption is needed, and how 
> best to "edit" the current schedule to 
> improve capacity, and generates events that produce four possible actions:
> # Container de-reservations
> # Resource-based preemptions
> # Container-based preemptions
> # Container killing
> The actions listed above are progressively more costly, and it is up to the 
> policy to use them as desired to achieve the rebalancing goals. 
> Note that due to the "lag" in the effect of these actions the policy should 
> operate at the macroscopic level (e.g., preempt tens of containers
> from a queue) and not trying to tightly and consistently micromanage 
> container allocations. 
> ------------- Preemption policy  (ProportionalCapacityPreemptionPolicy): 
> ------------- 
> Preemption policies are by design pluggable, in the following we present an 
> initial policy (ProportionalCapacityPreemptionPolicy) we have been 
> experimenting with.  The ProportionalCapacityPreemptionPolicy behaves as 
> follows:
> # it gathers from the scheduler the state of the queues, in particular, their 
> current capacity, guaranteed capacity and pending requests (*)
> # if there are pending requests from queues that are under capacity it 
> computes a new ideal balanced state (**)
> # it computes the set of preemptions needed to repair the current schedule 
> and achieve capacity balance (accounting for natural completion rates, and 
> respecting bounds on the amount of preemption we allow for each round)
> # it selects which applications to preempt from each over-capacity queue (the 
> last one in the FIFO order)
> # it remove reservations from the most recently assigned app until the amount 
> of resource to reclaim is obtained, or until no more reservations exits
> # (if not enough) it issues preemptions for containers from the same 
> applications (reverse chronological order, last assigned container first) 
> again until necessary or until no containers except the AM container are left,
> # (if not enough) it moves onto unreserve and preempt from the next 
> application. 
> # containers that have been asked to preempt are tracked across executions. 
> If a containers is among the one to be preempted for more than a certain 
> time, the container is moved in a the list of containers to be forcibly 
> killed. 
> Notes:
> (*) at the moment, in order to avoid double-counting of the requests, we only 
> look at the "ANY" part of pending resource requests, which means we might not 
> preempt on behalf of AMs that ask only for specific locations but not any. 
> (**) The ideal balance state is one in which each queue has at least its 
> guaranteed capacity, and the spare capacity is distributed among queues (that 
> wants some) as a weighted fair share. Where the weighting is based on the 
> guaranteed capacity of a queue, and the function runs to a fix point.  
> Tunables of the ProportionalCapacityPreemptionPolicy:
> #     observe-only mode (i.e., log the actions it would take, but behave as 
> read-only)
> # how frequently to run the policy
> # how long to wait between preemption and kill of a container
> # which fraction of the containers I would like to obtain should I preempt 
> (has to do with the natural rate at which containers are returned)
> # deadzone size, i.e., what % of over-capacity should I ignore (if we are off 
> perfect balance by some small % we ignore it)
> # overall amount of preemption we can afford for each run of the policy (in 
> terms of total cluster capacity)
> In our current experiments this set of tunables seem to be a good start to 
> shape the preemption action properly. More sophisticated preemption policies 
> could take into account different type of applications running, job 
> priorities, cost of preemption, integral of capacity imbalance. This is very 
> much a control-theory kind of problem, and some of the lessons on designing 
> and tuning controllers are likely to apply.
> Generality:
> The monitor-based scheduler edit, and the preemption mechanisms we introduced 
> here are designed to be more general than enforcing capacity/fairness, in 
> fact, we are considering other monitors that leverage the same idea of 
> "schedule edits" to target different global properties (e.g., allocate enough 
> resources to guarantee deadlines for important jobs, or data-locality 
> optimizations, IO-balancing among nodes, etc...).
> Note that by default the preemption policy we describe is disabled in the 
> patch.
> Depends on YARN-45 and YARN-567, is related to YARN-568

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to