[6/7] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan

2016-11-07 Thread jianhe
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
--
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d759d47..7e98f10 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -32,7 +32,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
@@ -112,7 +114,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Alloca
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceAllocationCommitter;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -128,6 +134,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -142,11 +151,12 @@ import com.google.common.base.Preconditions;
 @SuppressWarnings("unchecked")
 public class CapacityScheduler extends
 AbstractYarnScheduler implements
-PreemptableResourceScheduler, CapacitySchedulerContext, Configurable {
+PreemptableResourceScheduler, CapacitySchedulerContext, Configurable,
+ResourceAllocationCommitter {
 
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
   private YarnAuthorizationProvider authorizer;
- 
+
   private CSQueue root;
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@@ -155,6 +165,8 @@ public class CapacityScheduler extends
 
   private volatile boolean isLazyPreemptionEnabled = false;
 
+  private int offswitchPerHeartbeatLimit;
+
   static final Comparator nonPartitionedQueueComparator =
   new Comparator() {
 @Override
@@ -176,7 +188,7 @@ public class CapacityScheduler extends
   public void setConf(Configuration conf) {
   yarnConf = conf;
   }
-  
+
   private void validateConf(Configuration conf) {
 // validate scheduler memory allocation setting
 int minMem = conf.getInt(
@@ -229,7 +241,8 @@ public class CapacityScheduler extends
   private boolean usePortForNodeName;
 
   private boolean scheduleAsynchronously;
-  private AsyncScheduleThread asyncSchedulerThread;
+  private List asyncSchedulerThreads;
+  private 

[6/7] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan

2016-11-07 Thread jianhe
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6cdcab90/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
--
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index fd31a2d..09b59ee 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -32,7 +32,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
@@ -113,7 +115,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Alloca
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceAllocationCommitter;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -129,6 +135,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -143,11 +152,12 @@ import com.google.common.base.Preconditions;
 @SuppressWarnings("unchecked")
 public class CapacityScheduler extends
 AbstractYarnScheduler implements
-PreemptableResourceScheduler, CapacitySchedulerContext, Configurable {
+PreemptableResourceScheduler, CapacitySchedulerContext, Configurable,
+ResourceAllocationCommitter {
 
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
   private YarnAuthorizationProvider authorizer;
- 
+
   private CSQueue root;
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@@ -156,6 +166,8 @@ public class CapacityScheduler extends
 
   private volatile boolean isLazyPreemptionEnabled = false;
 
+  private int offswitchPerHeartbeatLimit;
+
   static final Comparator nonPartitionedQueueComparator =
   new Comparator() {
 @Override
@@ -177,7 +189,7 @@ public class CapacityScheduler extends
   public void setConf(Configuration conf) {
   yarnConf = conf;
   }
-  
+
   private void validateConf(Configuration conf) {
 // validate scheduler memory allocation setting
 int minMem = conf.getInt(
@@ -230,7 +242,8 @@ public class CapacityScheduler extends
   private boolean usePortForNodeName;
 
   private boolean scheduleAsynchronously;
-  private AsyncScheduleThread asyncSchedulerThread;
+  private List asyncSchedulerThreads;
+  private