[6/7] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java -- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d759d47..7e98f10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -32,7 +32,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; @@ -112,7 +114,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Alloca import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceAllocationCommitter; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -128,6 +134,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -142,11 +151,12 @@ import com.google.common.base.Preconditions; @SuppressWarnings("unchecked") public class CapacityScheduler extends AbstractYarnSchedulerimplements -PreemptableResourceScheduler, CapacitySchedulerContext, Configurable { +PreemptableResourceScheduler, CapacitySchedulerContext, Configurable, +ResourceAllocationCommitter { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private YarnAuthorizationProvider authorizer; - + private CSQueue root; // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -155,6 +165,8 @@ public class CapacityScheduler extends private volatile boolean isLazyPreemptionEnabled = false; + private int offswitchPerHeartbeatLimit; + static final Comparator nonPartitionedQueueComparator = new Comparator() { @Override @@ -176,7 +188,7 @@ public class CapacityScheduler extends public void setConf(Configuration conf) { yarnConf = conf; } - + private void validateConf(Configuration conf) { // validate scheduler memory allocation setting int minMem = conf.getInt( @@ -229,7 +241,8 @@ public class CapacityScheduler extends private boolean usePortForNodeName; private boolean scheduleAsynchronously; - private AsyncScheduleThread asyncSchedulerThread; + private List asyncSchedulerThreads; + private
[6/7] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6cdcab90/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java -- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index fd31a2d..09b59ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -32,7 +32,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; @@ -113,7 +115,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Alloca import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceAllocationCommitter; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -129,6 +135,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -143,11 +152,12 @@ import com.google.common.base.Preconditions; @SuppressWarnings("unchecked") public class CapacityScheduler extends AbstractYarnSchedulerimplements -PreemptableResourceScheduler, CapacitySchedulerContext, Configurable { +PreemptableResourceScheduler, CapacitySchedulerContext, Configurable, +ResourceAllocationCommitter { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private YarnAuthorizationProvider authorizer; - + private CSQueue root; // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -156,6 +166,8 @@ public class CapacityScheduler extends private volatile boolean isLazyPreemptionEnabled = false; + private int offswitchPerHeartbeatLimit; + static final Comparator nonPartitionedQueueComparator = new Comparator() { @Override @@ -177,7 +189,7 @@ public class CapacityScheduler extends public void setConf(Configuration conf) { yarnConf = conf; } - + private void validateConf(Configuration conf) { // validate scheduler memory allocation setting int minMem = conf.getInt( @@ -230,7 +242,8 @@ public class CapacityScheduler extends private boolean usePortForNodeName; private boolean scheduleAsynchronously; - private AsyncScheduleThread asyncSchedulerThread; + private List asyncSchedulerThreads; + private