Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug 20 01:34:29 2014 @@ -643,7 +643,10 @@ public class LeafQueue implements CSQueu addApplicationAttempt(application, user); } - metrics.submitAppAttempt(userName); + // We don't want to update metrics for move app + if (application.isPending()) { + metrics.submitAppAttempt(userName); + } getParent().submitApplicationAttempt(application, userName); } @@ -701,7 +704,6 @@ public class LeafQueue implements CSQueu throw ace; } - metrics.submitApp(userName); } private synchronized void activateApplications() { @@ -976,13 +978,18 @@ public class LeafQueue implements CSQueu Resource userLimit = // User limit computeUserLimit(application, clusterResource, required); - + + //Max avail capacity needs to take into account usage by ancestor-siblings + //which are greater than their base capacity, so we are interested in "max avail" + //capacity + float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, this); Resource queueMaxCap = // Queue Max-Capacity Resources.multiplyAndNormalizeDown( resourceCalculator, clusterResource, - absoluteMaxCapacity, + absoluteMaxAvailCapacity, minimumAllocation); Resource userConsumed = getUser(user).getConsumedResources(); @@ -1615,8 +1622,43 @@ public class LeafQueue implements CSQueu @Override public void collectSchedulerApplications( Collection<ApplicationAttemptId> apps) { + for (FiCaSchedulerApp pendingApp : pendingApplications) { + apps.add(pendingApp.getApplicationAttemptId()); + } for (FiCaSchedulerApp app : activeApplications) { apps.add(app.getApplicationAttemptId()); } } + + @Override + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + allocateResource(clusterResource, application, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + + " resource=" + rmContainer.getContainer().getResource() + + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + + usedResources + " cluster=" + clusterResource); + // Inform the parent queue + getParent().attachContainer(clusterResource, application, rmContainer); + } + } + + @Override + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + releaseResource(clusterResource, application, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + + " resource=" + rmContainer.getContainer().getResource() + + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + + usedResources + " cluster=" + clusterResource); + // Inform the parent queue + getParent().detachContainer(clusterResource, application, rmContainer); + } + } }
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Aug 20 01:34:29 2014 @@ -791,4 +791,37 @@ public class ParentQueue implements CSQu queue.collectSchedulerApplications(apps); } } + + @Override + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + allocateResource(clusterResource, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + clusterResource); + // Inform the parent + if (parent != null) { + parent.attachContainer(clusterResource, application, rmContainer); + } + } + } + + @Override + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + releaseResource(clusterResource, rmContainer.getContainer().getResource()); + LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + clusterResource); + // Inform the parent + if (parent != null) { + parent.detachContainer(clusterResource, application, rmContainer); + } + } + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Wed Aug 20 01:34:29 2014 @@ -44,11 +44,11 @@ import org.apache.hadoop.yarn.util.resou public class FSLeafQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( FSLeafQueue.class.getName()); - - private final List<AppSchedulable> runnableAppScheds = // apps that are runnable - new ArrayList<AppSchedulable>(); - private final List<AppSchedulable> nonRunnableAppScheds = - new ArrayList<AppSchedulable>(); + + private final List<FSAppAttempt> runnableApps = // apps that are runnable + new ArrayList<FSAppAttempt>(); + private final List<FSAppAttempt> nonRunnableApps = + new ArrayList<FSAppAttempt>(); private Resource demand = Resources.createResource(0); @@ -70,33 +70,31 @@ public class FSLeafQueue extends FSQueue amResourceUsage = Resource.newInstance(0, 0); } - public void addApp(FSSchedulerApp app, boolean runnable) { - AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this); - app.setAppSchedulable(appSchedulable); + public void addApp(FSAppAttempt app, boolean runnable) { if (runnable) { - runnableAppScheds.add(appSchedulable); + runnableApps.add(app); } else { - nonRunnableAppScheds.add(appSchedulable); + nonRunnableApps.add(app); } } // for testing - void addAppSchedulable(AppSchedulable appSched) { - runnableAppScheds.add(appSched); + void addAppSchedulable(FSAppAttempt appSched) { + runnableApps.add(appSched); } /** * Removes the given app from this queue. * @return whether or not the app was runnable */ - public boolean removeApp(FSSchedulerApp app) { - if (runnableAppScheds.remove(app.getAppSchedulable())) { + public boolean removeApp(FSAppAttempt app) { + if (runnableApps.remove(app)) { // Update AM resource usage if (app.isAmRunning() && app.getAMResource() != null) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); } return true; - } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) { + } else if (nonRunnableApps.remove(app)) { return false; } else { throw new IllegalStateException("Given app to remove " + app + @@ -104,22 +102,22 @@ public class FSLeafQueue extends FSQueue } } - public Collection<AppSchedulable> getRunnableAppSchedulables() { - return runnableAppScheds; + public Collection<FSAppAttempt> getRunnableAppSchedulables() { + return runnableApps; } - public List<AppSchedulable> getNonRunnableAppSchedulables() { - return nonRunnableAppScheds; + public List<FSAppAttempt> getNonRunnableAppSchedulables() { + return nonRunnableApps; } @Override public void collectSchedulerApplications( Collection<ApplicationAttemptId> apps) { - for (AppSchedulable appSched : runnableAppScheds) { - apps.add(appSched.getApp().getApplicationAttemptId()); + for (FSAppAttempt appSched : runnableApps) { + apps.add(appSched.getApplicationAttemptId()); } - for (AppSchedulable appSched : nonRunnableAppScheds) { - apps.add(appSched.getApp().getApplicationAttemptId()); + for (FSAppAttempt appSched : nonRunnableApps) { + apps.add(appSched.getApplicationAttemptId()); } } @@ -145,10 +143,10 @@ public class FSLeafQueue extends FSQueue @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (AppSchedulable app : runnableAppScheds) { + for (FSAppAttempt app : runnableApps) { Resources.addTo(usage, app.getResourceUsage()); } - for (AppSchedulable app : nonRunnableAppScheds) { + for (FSAppAttempt app : nonRunnableApps) { Resources.addTo(usage, app.getResourceUsage()); } return usage; @@ -165,13 +163,13 @@ public class FSLeafQueue extends FSQueue Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); - for (AppSchedulable sched : runnableAppScheds) { + for (FSAppAttempt sched : runnableApps) { if (Resources.equals(demand, maxRes)) { break; } updateDemandForApp(sched, maxRes); } - for (AppSchedulable sched : nonRunnableAppScheds) { + for (FSAppAttempt sched : nonRunnableApps) { if (Resources.equals(demand, maxRes)) { break; } @@ -183,7 +181,7 @@ public class FSLeafQueue extends FSQueue } } - private void updateDemandForApp(AppSchedulable sched, Resource maxRes) { + private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { sched.updateDemand(); Resource toAdd = sched.getDemand(); if (LOG.isDebugEnabled()) { @@ -207,9 +205,9 @@ public class FSLeafQueue extends FSQueue } Comparator<Schedulable> comparator = policy.getComparator(); - Collections.sort(runnableAppScheds, comparator); - for (AppSchedulable sched : runnableAppScheds) { - if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) { + Collections.sort(runnableApps, comparator); + for (FSAppAttempt sched : runnableApps) { + if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { continue; } @@ -237,8 +235,8 @@ public class FSLeafQueue extends FSQueue // Choose the app that is most over fair share Comparator<Schedulable> comparator = policy.getComparator(); - AppSchedulable candidateSched = null; - for (AppSchedulable sched : runnableAppScheds) { + FSAppAttempt candidateSched = null; + for (FSAppAttempt sched : runnableApps) { if (candidateSched == null || comparator.compare(sched, candidateSched) > 0) { candidateSched = sched; @@ -291,7 +289,7 @@ public class FSLeafQueue extends FSQueue @Override public int getNumRunnableApps() { - return runnableAppScheds.size(); + return runnableApps.size(); } @Override Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Wed Aug 20 01:34:29 2014 @@ -39,7 +39,8 @@ import org.apache.hadoop.yarn.util.resou @Private @Unstable -public abstract class FSQueue extends Schedulable implements Queue { +public abstract class FSQueue implements Queue, Schedulable { + private Resource fairShare = Resources.createResource(0, 0); private final String name; protected final FairScheduler scheduler; private final FSQueueMetrics metrics; @@ -139,10 +140,15 @@ public abstract class FSQueue extends Sc public FSQueueMetrics getMetrics() { return metrics; } - + + /** Get the fair share assigned to this Schedulable. */ + public Resource getFairShare() { + return fairShare; + } + @Override public void setFairShare(Resource fairShare) { - super.setFairShare(fairShare); + this.fairShare = fairShare; metrics.setFairShare(fairShare); } @@ -187,4 +193,16 @@ public abstract class FSQueue extends Sc } return true; } + + @Override + public boolean isActive() { + return getNumRunnableApps() > 0; + } + + /** Convenient toString implementation for debugging. */ + @Override + public String toString() { + return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", + getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Wed Aug 20 01:34:29 2014 @@ -35,7 +35,7 @@ public class FSSchedulerNode extends Sch private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); - private AppSchedulable reservedAppSchedulable; + private FSAppAttempt reservedAppSchedulable; public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); @@ -76,7 +76,7 @@ public class FSSchedulerNode extends Sch " on node " + this + " for application " + application); } setReservedContainer(container); - this.reservedAppSchedulable = ((FSSchedulerApp) application).getAppSchedulable(); + this.reservedAppSchedulable = (FSAppAttempt) application; } @Override @@ -98,7 +98,7 @@ public class FSSchedulerNode extends Sch this.reservedAppSchedulable = null; } - public synchronized AppSchedulable getReservedAppSchedulable() { + public synchronized FSAppAttempt getReservedAppSchedulable() { return reservedAppSchedulable; } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Aug 20 01:34:29 2014 @@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; @@ -117,7 +115,7 @@ import com.google.common.base.Preconditi @Unstable @SuppressWarnings("unchecked") public class FairScheduler extends - AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> { + AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> { private FairSchedulerConfiguration conf; private Resource incrAllocation; @@ -139,13 +137,17 @@ public class FairScheduler extends private final int UPDATE_DEBUG_FREQUENCY = 5; private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; - private Thread updateThread; - private Thread schedulingThread; + @VisibleForTesting + Thread updateThread; + + @VisibleForTesting + Thread schedulingThread; // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; // Aggregate metrics FSQueueMetrics rootMetrics; + FSOpDurations fsOpDurations; // Time when we last updated preemption vars protected long lastPreemptionUpdateTime; @@ -243,16 +245,24 @@ public class FairScheduler extends } /** - * A runnable which calls {@link FairScheduler#update()} every + * Thread which calls {@link FairScheduler#update()} every * <code>updateInterval</code> milliseconds. */ - private class UpdateThread implements Runnable { + private class UpdateThread extends Thread { + + @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(updateInterval); + long start = getClock().getTime(); update(); preemptTasksIfNecessary(); + long duration = getClock().getTime() - start; + fsOpDurations.addUpdateThreadRunDuration(duration); + } catch (InterruptedException ie) { + LOG.warn("Update thread interrupted. Exiting."); + return; } catch (Exception e) { LOG.error("Exception in fair scheduler UpdateThread", e); } @@ -261,11 +271,32 @@ public class FairScheduler extends } /** + * Thread which attempts scheduling resources continuously, + * asynchronous to the node heartbeats. + */ + private class ContinuousSchedulingThread extends Thread { + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + continuousSchedulingAttempt(); + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.warn("Continuous scheduling thread interrupted. Exiting.", e); + return; + } + } + } + } + + /** * Recompute the internal variables used by the scheduler - per-job weights, * fair shares, deficits, minimum slot allocations, and amount of used and * required resources per job. */ protected synchronized void update() { + long start = getClock().getTime(); updatePreemptionVariables(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -289,6 +320,9 @@ public class FairScheduler extends " Demand: " + rootQueue.getDemand()); } } + + long duration = getClock().getTime() - start; + fsOpDurations.addUpdateCallDuration(duration); } /** @@ -297,7 +331,7 @@ public class FairScheduler extends * for each type of task. */ private void updatePreemptionVariables() { - long now = clock.getTime(); + long now = getClock().getTime(); lastPreemptionUpdateTime = now; for (FSLeafQueue sched : queueMgr.getLeafQueues()) { if (!isStarvedForMinShare(sched)) { @@ -324,7 +358,8 @@ public class FairScheduler extends * defined as being below half its fair share. */ boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, + Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, + clusterResource, Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredFairShare); @@ -342,7 +377,7 @@ public class FairScheduler extends return; } - long curTime = clock.getTime(); + long curTime = getClock().getTime(); if (curTime - lastPreemptCheckTime < preemptionInterval) { return; } @@ -370,6 +405,7 @@ public class FairScheduler extends * We make sure that no queue is placed below its fair share in the process. */ protected void preemptResources(Resource toPreempt) { + long start = getClock().getTime(); if (Resources.equals(toPreempt, Resources.none())) { return; } @@ -394,8 +430,8 @@ public class FairScheduler extends try { // Reset preemptedResource for each app for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { - for (AppSchedulable app : queue.getRunnableAppSchedulables()) { - app.getApp().resetPreemptedResources(); + for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { + app.resetPreemptedResources(); } } @@ -415,16 +451,19 @@ public class FairScheduler extends } finally { // Clear preemptedResources for each app for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { - for (AppSchedulable app : queue.getRunnableAppSchedulables()) { - app.getApp().clearPreemptedResources(); + for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { + app.clearPreemptedResources(); } } } + + long duration = getClock().getTime() - start; + fsOpDurations.addPreemptCallDuration(duration); } protected void warnOrKillContainer(RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); - FSSchedulerApp app = getSchedulerApp(appAttemptId); + FSAppAttempt app = getSchedulerApp(appAttemptId); FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + @@ -435,7 +474,7 @@ public class FairScheduler extends if (time != null) { // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, // proceed with kill - if (time + waitTimeBeforeKill < clock.getTime()) { + if (time + waitTimeBeforeKill < getClock().getTime()) { ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); @@ -446,11 +485,11 @@ public class FairScheduler extends completedContainer(container, status, RMContainerEventType.KILL); LOG.info("Killing container" + container + " (after waiting for premption for " + - (clock.getTime() - time) + "ms)"); + (getClock().getTime() - time) + "ms)"); } } else { - // track the request in the FSSchedulerApp itself - app.addPreemption(container, clock.getTime()); + // track the request in the FSAppAttempt itself + app.addPreemption(container, getClock().getTime()); } } @@ -500,7 +539,7 @@ public class FairScheduler extends } // synchronized for sizeBasedWeight - public synchronized ResourceWeights getAppWeight(AppSchedulable app) { + public synchronized ResourceWeights getAppWeight(FSAppAttempt app) { double weight = 1.0; if (sizeBasedWeight) { // Set weight based on current memory demand @@ -595,8 +634,8 @@ public class FairScheduler extends return; } - SchedulerApplication<FSSchedulerApp> application = - new SchedulerApplication<FSSchedulerApp>(queue, user); + SchedulerApplication<FSAppAttempt> application = + new SchedulerApplication<FSAppAttempt>(queue, user); applications.put(applicationId, application); queue.getMetrics().submitApp(user); @@ -620,18 +659,18 @@ public class FairScheduler extends ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { - SchedulerApplication<FSSchedulerApp> application = + SchedulerApplication<FSAppAttempt> application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); FSLeafQueue queue = (FSLeafQueue) application.getQueue(); - FSSchedulerApp attempt = - new FSSchedulerApp(applicationAttemptId, user, + FSAppAttempt attempt = + new FSAppAttempt(this, applicationAttemptId, user, queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt(application - .getCurrentAppAttempt()); + .getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt); @@ -701,7 +740,7 @@ public class FairScheduler extends private synchronized void removeApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication<FSSchedulerApp> application = + SchedulerApplication<FSAppAttempt> application = applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); @@ -716,9 +755,9 @@ public class FairScheduler extends RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApplication<FSSchedulerApp> application = + SchedulerApplication<FSAppAttempt> application = applications.get(applicationAttemptId.getApplicationId()); - FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); + FSAppAttempt attempt = getSchedulerApp(applicationAttemptId); if (attempt == null || application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); @@ -769,7 +808,8 @@ public class FairScheduler extends /** * Clean up a completed container. */ - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -779,7 +819,7 @@ public class FairScheduler extends Container container = rmContainer.getContainer(); // Get the application for the finished container - FSSchedulerApp application = + FSAppAttempt application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = container.getId().getApplicationAttemptId().getApplicationId(); @@ -794,8 +834,7 @@ public class FairScheduler extends FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(node, rmContainer.getReservedPriority()); - node.unreserveResource(application); + application.unreserve(rmContainer.getReservedPriority(), node); } else { application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); @@ -855,7 +894,7 @@ public class FairScheduler extends List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) { // Make sure this application exists - FSSchedulerApp application = getSchedulerApp(appAttemptId); + FSAppAttempt application = getSchedulerApp(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -873,21 +912,7 @@ public class FairScheduler extends } // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FairScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { if (!ask.isEmpty()) { @@ -932,6 +957,7 @@ public class FairScheduler extends * Process a heartbeat update from a node. */ private synchronized void nodeUpdate(RMNode nm) { + long start = getClock().getTime(); if (LOG.isDebugEnabled()) { LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); } @@ -968,9 +994,13 @@ public class FairScheduler extends } else { attemptScheduling(node); } + + long duration = getClock().getTime() - start; + fsOpDurations.addNodeUpdateDuration(duration); } - void continuousSchedulingAttempt() { + void continuousSchedulingAttempt() throws InterruptedException { + long start = getClock().getTime(); List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This @@ -993,6 +1023,9 @@ public class FairScheduler extends ": " + ex.toString(), ex); } } + + long duration = getClock().getTime() - start; + fsOpDurations.addContinuousSchedulingRunDuration(duration); } /** Sort nodes by available resource */ @@ -1017,13 +1050,13 @@ public class FairScheduler extends // 1. Check for reserved applications // 2. Schedule if there are no reservations - AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); + FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { Priority reservedPriority = node.getReservedContainer().getReservedPriority(); if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() + + reservedAppSchedulable.getApplicationAttemptId() + " on node " + node); reservedAppSchedulable.unreserve(reservedPriority, node); reservedAppSchedulable = null; @@ -1031,7 +1064,7 @@ public class FairScheduler extends // Reservation exists; try to fulfill the reservation if (LOG.isDebugEnabled()) { LOG.debug("Trying to fulfill reservation for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() + + reservedAppSchedulable.getApplicationAttemptId() + " on node: " + node); } @@ -1056,8 +1089,8 @@ public class FairScheduler extends updateRootQueueMetrics(); } - public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId); + public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { + return super.getApplicationAttempt(appAttemptId); } /** @@ -1102,6 +1135,8 @@ public class FairScheduler extends } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); break; case NODE_REMOVED: if (!(event instanceof NodeRemovedSchedulerEvent)) { @@ -1216,9 +1251,11 @@ public class FairScheduler extends } rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + fsOpDurations = FSOpDurations.getInstance(true); + // This stores per-application scheduling information - this.applications = - new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>(); + this.applications = new ConcurrentHashMap< + ApplicationId, SchedulerApplication<FSAppAttempt>>(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); @@ -1229,30 +1266,14 @@ public class FairScheduler extends throw new IOException("Failed to start FairScheduler", e); } - updateThread = new Thread(new UpdateThread()); + updateThread = new UpdateThread(); updateThread.setName("FairSchedulerUpdateThread"); updateThread.setDaemon(true); if (continuousSchedulingEnabled) { // start continuous scheduling thread - schedulingThread = new Thread( - new Runnable() { - @Override - public void run() { - while (!Thread.currentThread().isInterrupted()) { - try { - continuousSchedulingAttempt(); - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.error("Continuous scheduling thread interrupted. Exiting. ", - e); - return; - } - } - } - } - ); - schedulingThread.setName("ContinuousScheduling"); + schedulingThread = new ContinuousSchedulingThread(); + schedulingThread.setName("FairSchedulerContinuousScheduling"); schedulingThread.setDaemon(true); } @@ -1334,7 +1355,7 @@ public class FairScheduler extends @Override public List<QueueUserACLInfo> getQueueUserAclInfo() { - UserGroupInformation user = null; + UserGroupInformation user; try { user = UserGroupInformation.getCurrentUser(); } catch (IOException ioe) { @@ -1396,11 +1417,11 @@ public class FairScheduler extends @Override public synchronized String moveApplication(ApplicationId appId, String queueName) throws YarnException { - SchedulerApplication<FSSchedulerApp> app = applications.get(appId); + SchedulerApplication<FSAppAttempt> app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); } - FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt(); + FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt(); // To serialize with FairScheduler#allocate, synchronize on app attempt synchronized (attempt) { FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); @@ -1413,8 +1434,7 @@ public class FairScheduler extends return oldQueue.getQueueName(); } - if (oldQueue.getRunnableAppSchedulables().contains( - attempt.getAppSchedulable())) { + if (oldQueue.getRunnableAppSchedulables().contains(attempt)) { verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); } @@ -1423,7 +1443,7 @@ public class FairScheduler extends } } - private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app, + private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app, FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException { String queueName = targetQueue.getQueueName(); ApplicationAttemptId appAttId = app.getApplicationAttemptId(); @@ -1460,8 +1480,8 @@ public class FairScheduler extends * Helper for moveApplication, which has appropriate synchronization, so all * operations will be atomic. */ - private void executeMove(SchedulerApplication<FSSchedulerApp> app, - FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { + private void executeMove(SchedulerApplication<FSAppAttempt> app, + FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { boolean wasRunnable = oldQueue.removeApp(attempt); // if app was not runnable before, it may be runnable now boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java Wed Aug 20 01:34:29 2014 @@ -25,15 +25,15 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability.Unstable; /** - * Order {@link AppSchedulable} objects by priority and then by submit time, as + * Order {@link FSAppAttempt} objects by priority and then by submit time, as * in the default scheduler in Hadoop. */ @Private @Unstable -public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable { +public class FifoAppComparator implements Comparator<FSAppAttempt>, Serializable { private static final long serialVersionUID = 3428835083489547918L; - public int compare(AppSchedulable a1, AppSchedulable a2) { + public int compare(FSAppAttempt a1, FSAppAttempt a2) { int res = a1.getPriority().compareTo(a2.getPriority()); if (res == 0) { if (a1.getStartTime() < a2.getStartTime()) { @@ -44,7 +44,7 @@ public class FifoAppComparator implement } if (res == 0) { // If there is a tie, break it by app ID to get a deterministic order - res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId()); + res = a1.getApplicationId().compareTo(a2.getApplicationId()); } return res; } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Wed Aug 20 01:34:29 2014 @@ -43,7 +43,7 @@ public class MaxRunningAppsEnforcer { // Tracks the number of running applications by user. private final Map<String, Integer> usersNumRunnableApps; @VisibleForTesting - final ListMultimap<String, AppSchedulable> usersNonRunnableApps; + final ListMultimap<String, FSAppAttempt> usersNonRunnableApps; public MaxRunningAppsEnforcer(FairScheduler scheduler) { this.scheduler = scheduler; @@ -80,7 +80,7 @@ public class MaxRunningAppsEnforcer { * Tracks the given new runnable app for purposes of maintaining max running * app limits. */ - public void trackRunnableApp(FSSchedulerApp app) { + public void trackRunnableApp(FSAppAttempt app) { String user = app.getUser(); FSLeafQueue queue = app.getQueue(); // Increment running counts for all parent queues @@ -99,9 +99,9 @@ public class MaxRunningAppsEnforcer { * Tracks the given new non runnable app so that it can be made runnable when * it would not violate max running app limits. */ - public void trackNonRunnableApp(FSSchedulerApp app) { + public void trackNonRunnableApp(FSAppAttempt app) { String user = app.getUser(); - usersNonRunnableApps.put(user, app.getAppSchedulable()); + usersNonRunnableApps.put(user, app); } /** @@ -111,7 +111,7 @@ public class MaxRunningAppsEnforcer { * Runs in O(n log(n)) where n is the number of queues that are under the * highest queue that went from having no slack to having slack. */ - public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) { + public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); // childqueueX might have no pending apps itself, but if a queue higher up @@ -133,8 +133,8 @@ public class MaxRunningAppsEnforcer { parent = parent.getParent(); } - List<List<AppSchedulable>> appsNowMaybeRunnable = - new ArrayList<List<AppSchedulable>>(); + List<List<FSAppAttempt>> appsNowMaybeRunnable = + new ArrayList<List<FSAppAttempt>>(); // Compile lists of apps which may now be runnable // We gather lists instead of building a set of all non-runnable apps so @@ -150,26 +150,26 @@ public class MaxRunningAppsEnforcer { userNumRunning = 0; } if (userNumRunning == allocConf.getUserMaxApps(user) - 1) { - List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user); + List<FSAppAttempt> userWaitingApps = usersNonRunnableApps.get(user); if (userWaitingApps != null) { appsNowMaybeRunnable.add(userWaitingApps); } } // Scan through and check whether this means that any apps are now runnable - Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator( + Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); - FSSchedulerApp prev = null; - List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>(); + FSAppAttempt prev = null; + List<FSAppAttempt> noLongerPendingApps = new ArrayList<FSAppAttempt>(); while (iter.hasNext()) { - FSSchedulerApp next = iter.next(); + FSAppAttempt next = iter.next(); if (next == prev) { continue; } if (canAppBeRunnable(next.getQueue(), next.getUser())) { trackRunnableApp(next); - AppSchedulable appSched = next.getAppSchedulable(); + FSAppAttempt appSched = next; next.getQueue().getRunnableAppSchedulables().add(appSched); noLongerPendingApps.add(appSched); @@ -186,14 +186,14 @@ public class MaxRunningAppsEnforcer { // We remove the apps from their pending lists afterwards so that we don't // pull them out from under the iterator. If they are not in these lists // in the first place, there is a bug. - for (AppSchedulable appSched : noLongerPendingApps) { - if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables() + for (FSAppAttempt appSched : noLongerPendingApps) { + if (!appSched.getQueue().getNonRunnableAppSchedulables() .remove(appSched)) { LOG.error("Can't make app runnable that does not already exist in queue" + " as non-runnable: " + appSched + ". This should never happen."); } - if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) { + if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " + "usersNonRunnableApps, but was not. This should never happen."); } @@ -204,7 +204,7 @@ public class MaxRunningAppsEnforcer { * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. */ - public void untrackRunnableApp(FSSchedulerApp app) { + public void untrackRunnableApp(FSAppAttempt app) { // Update usersRunnableApps String user = app.getUser(); int newUserNumRunning = usersNumRunnableApps.get(user) - 1; @@ -226,8 +226,8 @@ public class MaxRunningAppsEnforcer { /** * Stops tracking the given non-runnable app */ - public void untrackNonRunnableApp(FSSchedulerApp app) { - usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); + public void untrackNonRunnableApp(FSAppAttempt app) { + usersNonRunnableApps.remove(app.getUser(), app); } /** @@ -235,7 +235,7 @@ public class MaxRunningAppsEnforcer { * of non-runnable applications. */ private void gatherPossiblyRunnableAppLists(FSQueue queue, - List<List<AppSchedulable>> appLists) { + List<List<FSAppAttempt>> appLists) { if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() .getQueueMaxApps(queue.getName())) { if (queue instanceof FSLeafQueue) { @@ -259,14 +259,14 @@ public class MaxRunningAppsEnforcer { * of O(num lists) time. */ static class MultiListStartTimeIterator implements - Iterator<FSSchedulerApp> { + Iterator<FSAppAttempt> { - private List<AppSchedulable>[] appLists; + private List<FSAppAttempt>[] appLists; private int[] curPositionsInAppLists; private PriorityQueue<IndexAndTime> appListsByCurStartTime; @SuppressWarnings("unchecked") - public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) { + public MultiListStartTimeIterator(List<List<FSAppAttempt>> appListList) { appLists = appListList.toArray(new List[appListList.size()]); curPositionsInAppLists = new int[appLists.length]; appListsByCurStartTime = new PriorityQueue<IndexAndTime>(); @@ -284,10 +284,10 @@ public class MaxRunningAppsEnforcer { } @Override - public FSSchedulerApp next() { + public FSAppAttempt next() { IndexAndTime indexAndTime = appListsByCurStartTime.remove(); int nextListIndex = indexAndTime.index; - AppSchedulable next = appLists[nextListIndex] + FSAppAttempt next = appLists[nextListIndex] .get(curPositionsInAppLists[nextListIndex]); curPositionsInAppLists[nextListIndex]++; @@ -299,7 +299,7 @@ public class MaxRunningAppsEnforcer { } appListsByCurStartTime.add(indexAndTime); - return next.getApp(); + return next; } @Override Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java Wed Aug 20 01:34:29 2014 @@ -48,7 +48,7 @@ public class NewAppWeightBooster extends super.setConf(conf); } - public double adjustWeight(AppSchedulable app, double curWeight) { + public double adjustWeight(FSAppAttempt app, double curWeight) { long start = app.getStartTime(); long now = System.currentTimeMillis(); if (now - start < duration) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Wed Aug 20 01:34:29 2014 @@ -27,20 +27,14 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.util.resource.Resources; /** - * A Schedulable represents an entity that can launch tasks, such as a job - * or a queue. It provides a common interface so that algorithms such as fair - * sharing can be applied both within a queue and across queues. There are - * currently two types of Schedulables: JobSchedulables, which represent a - * single job, and QueueSchedulables, which allocate among jobs in their queue. - * - * Separate sets of Schedulables are used for maps and reduces. Each queue has - * both a mapSchedulable and a reduceSchedulable, and so does each job. + * A Schedulable represents an entity that can be scheduled such as an + * application or a queue. It provides a common interface so that algorithms + * such as fair sharing can be applied both within a queue and across queues. * * A Schedulable is responsible for three roles: - * 1) It can launch tasks through assignTask(). - * 2) It provides information about the job/queue to the scheduler, including: + * 1) Assign resources through {@link #assignContainer}. + * 2) It provides information about the app/queue to the scheduler, including: * - Demand (maximum number of tasks required) - * - Number of currently running tasks * - Minimum share (for queues) * - Job/queue weight (for fair sharing) * - Start time and priority (for FIFO) @@ -57,69 +51,61 @@ import org.apache.hadoop.yarn.util.resou */ @Private @Unstable -public abstract class Schedulable { - /** Fair share assigned to this Schedulable */ - private Resource fairShare = Resources.createResource(0); - +public interface Schedulable { /** * Name of job/queue, used for debugging as well as for breaking ties in * scheduling order deterministically. */ - public abstract String getName(); + public String getName(); /** * Maximum number of resources required by this Schedulable. This is defined as * number of currently utilized resources + number of unlaunched resources (that * are either not yet launched or need to be speculated). */ - public abstract Resource getDemand(); + public Resource getDemand(); /** Get the aggregate amount of resources consumed by the schedulable. */ - public abstract Resource getResourceUsage(); + public Resource getResourceUsage(); /** Minimum Resource share assigned to the schedulable. */ - public abstract Resource getMinShare(); + public Resource getMinShare(); /** Maximum Resource share assigned to the schedulable. */ - public abstract Resource getMaxShare(); + public Resource getMaxShare(); /** Job/queue weight in fair sharing. */ - public abstract ResourceWeights getWeights(); + public ResourceWeights getWeights(); /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/ - public abstract long getStartTime(); + public long getStartTime(); /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */ - public abstract Priority getPriority(); + public Priority getPriority(); /** Refresh the Schedulable's demand and those of its children if any. */ - public abstract void updateDemand(); + public void updateDemand(); /** * Assign a container on this node if possible, and return the amount of * resources assigned. */ - public abstract Resource assignContainer(FSSchedulerNode node); + public Resource assignContainer(FSSchedulerNode node); /** * Preempt a container from this Schedulable if possible. */ - public abstract RMContainer preemptContainer(); + public RMContainer preemptContainer(); + + /** Get the fair share assigned to this Schedulable. */ + public Resource getFairShare(); /** Assign a fair share to this Schedulable. */ - public void setFairShare(Resource fairShare) { - this.fairShare = fairShare; - } + public void setFairShare(Resource fairShare); - /** Get the fair share assigned to this Schedulable. */ - public Resource getFairShare() { - return fairShare; - } - - /** Convenient toString implementation for debugging. */ - @Override - public String toString() { - return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", - getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); - } + /** + * Returns true if queue has atleast one app running. Always returns true for + * AppSchedulables. + */ + public boolean isActive(); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Wed Aug 20 01:34:29 2014 @@ -32,5 +32,5 @@ import org.apache.hadoop.conf.Configurab @Private @Unstable public interface WeightAdjuster { - public double adjustWeight(AppSchedulable app, double curWeight); + public double adjustWeight(FSAppAttempt app, double curWeight); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java Wed Aug 20 01:34:29 2014 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; +import java.util.ArrayList; import java.util.Collection; import org.apache.hadoop.yarn.api.records.Resource; @@ -33,7 +34,31 @@ import org.apache.hadoop.yarn.server.res public class ComputeFairShares { private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; - + + /** + * Compute fair share of the given schedulables.Fair share is an allocation of + * shares considering only active schedulables ie schedulables which have + * running apps. + * + * @param schedulables + * @param totalResources + * @param type + */ + public static void computeShares( + Collection<? extends Schedulable> schedulables, Resource totalResources, + ResourceType type) { + Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>(); + for (Schedulable sched : schedulables) { + if (sched.isActive()) { + activeSchedulables.add(sched); + } else { + setResourceValue(0, sched.getFairShare(), type); + } + } + + computeSharesInternal(activeSchedulables, totalResources, type); + } + /** * Given a set of Schedulables and a number of slots, compute their weighted * fair shares. The min and max shares and of the Schedulables are assumed to @@ -75,7 +100,7 @@ public class ComputeFairShares { * because resourceUsedWithWeightToResourceRatio is linear-time and the number of * iterations of binary search is a constant (dependent on desired precision). */ - public static void computeShares( + private static void computeSharesInternal( Collection<? extends Schedulable> schedulables, Resource totalResources, ResourceType type) { if (schedulables.isEmpty()) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug 20 01:34:29 2014 @@ -52,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; - import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -295,21 +292,7 @@ public class FifoScheduler extends clusterResource, minimumAllocation, maximumAllocation); // Release containers - for (ContainerId releasedContainer : release) { - RMContainer rmContainer = getRMContainer(releasedContainer); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FifoScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainer); - } - containerCompleted(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainer, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -443,7 +426,7 @@ public class FifoScheduler extends LOG.info("Skip killing " + container.getContainerId()); continue; } - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); @@ -717,7 +700,7 @@ public class FifoScheduler extends for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - containerCompleted(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -818,7 +801,7 @@ public class FifoScheduler extends ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); - containerCompleted(getRMContainer(containerid), + completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), @@ -831,7 +814,8 @@ public class FifoScheduler extends } @Lock(FifoScheduler.class) - private synchronized void containerCompleted(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -881,7 +865,7 @@ public class FifoScheduler extends } // Kill running containers for(RMContainer container : node.getRunningContainers()) { - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Wed Aug 20 01:34:29 2014 @@ -38,6 +38,10 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.security.MasterKeyData; import com.google.common.annotations.VisibleForTesting; @@ -66,6 +70,7 @@ public class AMRMTokenSecretManager exte private final Timer timer; private final long rollingInterval; private final long activationDelay; + private RMContext rmContext; private final Set<ApplicationAttemptId> appAttemptSet = new HashSet<ApplicationAttemptId>(); @@ -73,7 +78,8 @@ public class AMRMTokenSecretManager exte /** * Create an {@link AMRMTokenSecretManager} */ - public AMRMTokenSecretManager(Configuration conf) { + public AMRMTokenSecretManager(Configuration conf, RMContext rmContext) { + this.rmContext = rmContext; this.timer = new Timer(); this.rollingInterval = conf @@ -98,6 +104,11 @@ public class AMRMTokenSecretManager exte public void start() { if (this.currentMasterKey == null) { this.currentMasterKey = createNewMasterKey(); + AMRMTokenSecretManagerState state = + AMRMTokenSecretManagerState.newInstance( + this.currentMasterKey.getMasterKey(), null); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state, + false); } this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, rollingInterval); @@ -130,6 +141,12 @@ public class AMRMTokenSecretManager exte try { LOG.info("Rolling master-key for amrm-tokens"); this.nextMasterKey = createNewMasterKey(); + AMRMTokenSecretManagerState state = + AMRMTokenSecretManagerState.newInstance( + this.currentMasterKey.getMasterKey(), + this.nextMasterKey.getMasterKey()); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state, + true); this.timer.schedule(new NextKeyActivator(), this.activationDelay); } finally { this.writeLock.unlock(); @@ -150,6 +167,11 @@ public class AMRMTokenSecretManager exte + this.nextMasterKey.getMasterKey().getKeyId()); this.currentMasterKey = this.nextMasterKey; this.nextMasterKey = null; + AMRMTokenSecretManagerState state = + AMRMTokenSecretManagerState.newInstance( + this.currentMasterKey.getMasterKey(), null); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state, + true); } finally { this.writeLock.unlock(); } @@ -225,8 +247,8 @@ public class AMRMTokenSecretManager exte LOG.debug("Trying to retrieve password for " + applicationAttemptId); } if (!appAttemptSet.contains(applicationAttemptId)) { - throw new InvalidToken("Password not found for ApplicationAttempt " - + applicationAttemptId); + throw new InvalidToken(applicationAttemptId + + " not found in AMRMTokenSecretManager."); } if (identifier.getKeyId() == this.currentMasterKey.getMasterKey() .getKeyId()) { @@ -238,9 +260,7 @@ public class AMRMTokenSecretManager exte return createPassword(identifier.getBytes(), this.nextMasterKey.getSecretKey()); } - throw new InvalidToken("Given AMRMToken for application : " - + applicationAttemptId.toString() - + " seems to have been generated illegally."); + throw new InvalidToken("Invalid AMRMToken from " + applicationAttemptId); } finally { this.readLock.unlock(); } @@ -291,4 +311,25 @@ public class AMRMTokenSecretManager exte this.readLock.unlock(); } } + + public void recover(RMState state) { + if (state.getAMRMTokenSecretManagerState() != null) { + // recover the current master key + MasterKey currentKey = + state.getAMRMTokenSecretManagerState().getCurrentMasterKey(); + this.currentMasterKey = + new MasterKeyData(currentKey, createSecretKey(currentKey.getBytes() + .array())); + + // recover the next master key if not null + MasterKey nextKey = + state.getAMRMTokenSecretManagerState().getNextMasterKey(); + if (nextKey != null) { + this.nextMasterKey = + new MasterKeyData(nextKey, createSecretKey(nextKey.getBytes() + .array())); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } + } + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Wed Aug 20 01:34:29 2014 @@ -55,6 +55,7 @@ import org.apache.commons.codec.binary.B import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -680,6 +681,11 @@ public class RMWebServices { throw new AuthorizationException(msg); } + if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { + String msg = "The default static user cannot carry out this operation."; + return Response.status(Status.FORBIDDEN).entity(msg).build(); + } + String userName = callerUGI.getUserName(); RMApp app = null; try { @@ -800,6 +806,13 @@ public class RMWebServices { return callerUGI; } + private boolean isStaticUser(UserGroupInformation callerUGI) { + String staticUser = + conf.get(CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER, + CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER); + return staticUser.equals(callerUGI.getUserName()); + } + /** * Generates a new ApplicationId which is then sent to the client * @@ -822,6 +835,10 @@ public class RMWebServices { throw new AuthorizationException("Unable to obtain user name, " + "user not authenticated"); } + if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { + String msg = "The default static user cannot carry out this operation."; + return Response.status(Status.FORBIDDEN).entity(msg).build(); + } NewApplication appId = createNewApplication(); return Response.status(Status.OK).entity(appId).build(); @@ -859,6 +876,11 @@ public class RMWebServices { + "user not authenticated"); } + if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { + String msg = "The default static user cannot carry out this operation."; + return Response.status(Status.FORBIDDEN).entity(msg).build(); + } + ApplicationSubmissionContext appContext = createAppSubmissionContext(newApp); final SubmitApplicationRequest req = @@ -975,7 +997,7 @@ public class RMWebServices { * * @param newApp * the information provided by the user - * @return + * @return created context * @throws BadRequestException * @throws IOException */ Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java Wed Aug 20 01:34:29 2014 @@ -46,8 +46,7 @@ public class FairSchedulerInfo extends S } public int getAppFairShare(ApplicationAttemptId appAttemptId) { - return scheduler.getSchedulerApp(appAttemptId). - getAppSchedulable().getFairShare().getMemory(); + return scheduler.getSchedulerApp(appAttemptId).getFairShare().getMemory(); } public FairSchedulerQueueInfo getRootQueueInfo() { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java Wed Aug 20 01:34:29 2014 @@ -24,7 +24,8 @@ import javax.xml.bind.annotation.XmlAcce import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; @@ -39,9 +40,9 @@ public class FairSchedulerLeafQueueInfo public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) { super(queue, scheduler); - Collection<AppSchedulable> apps = queue.getRunnableAppSchedulables(); - for (AppSchedulable app : apps) { - if (app.getApp().isPending()) { + Collection<FSAppAttempt> apps = queue.getRunnableAppSchedulables(); + for (FSAppAttempt app : apps) { + if (app.isPending()) { numPendingApps++; } else { numActiveApps++; Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Wed Aug 20 01:34:29 2014 @@ -49,7 +49,7 @@ public class MockAM { private volatile int responseId = 0; private final ApplicationAttemptId attemptId; - private final RMContext context; + private RMContext context; private ApplicationMasterProtocol amRMProtocol; private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>(); @@ -61,8 +61,10 @@ public class MockAM { this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } - - void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) { + + public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol, + RMContext context) { + this.context = context; this.amRMProtocol = amRMProtocol; } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Aug 20 01:34:29 2014 @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; @@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -412,6 +415,13 @@ public class MockRM extends ResourceMana throws Exception { MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); am.waitForState(RMAppAttemptState.ALLOCATED); + //create and set AMRMToken + Token<AMRMTokenIdentifier> amrmToken = + this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttemptId); + ((RMAppAttemptImpl) this.rmContext.getRMApps() + .get(appAttemptId.getApplicationId()).getRMAppAttempt(appAttemptId)) + .setAMRMToken(amrmToken); getRMContext() .getDispatcher() .getEventHandler() Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java Wed Aug 20 01:34:29 2014 @@ -59,8 +59,9 @@ public class MockRMWithCustomAMLauncher return containerManager; } @Override - protected Token<AMRMTokenIdentifier> getAMRMToken() { - Token<AMRMTokenIdentifier> amRmToken = super.getAMRMToken(); + protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() { + Token<AMRMTokenIdentifier> amRmToken = + super.createAndSetAMRMToken(); InetSocketAddress serviceAddr = getConfig().getSocketAddr( YarnConfiguration.RM_SCHEDULER_ADDRESS, Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Wed Aug 20 01:34:29 2014 @@ -449,6 +449,35 @@ public class TestApplicationCleanup { rm2.stop(); } + @Test (timeout = 60000) + public void testAppCleanupWhenNMReconnects() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // wait for application cleanup message received + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + // reconnect NM with application still active + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + rm1.stop(); + } + public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); t.testAppCleanup();
