[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Hao updated MAPREDUCE-6176:
--------------------------------
       Fix Version/s: 2.4.1
    Target Version/s: 2.5.2, 2.5.1, 2.4.1, 2.5.0  (was: 2.5.0, 2.4.1, 2.5.1, 
2.5.2)
        Release Note: 
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 7c18f06..6f69168 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -111,16 +111,18 @@
   completed -> request corresponding to which container has completed
   
   Lifecycle of map
-  scheduled->assigned->completed
-  
+  pending->scheduled->assigned->completed
+
   Lifecycle of reduce
   pending->scheduled->assigned->completed
-  
-  Maps are scheduled as soon as their requests are received. Reduces are 
-  added to the pending and are ramped up (added to scheduled) based 
-  on completed maps and current availability in the cluster.
+
+  Maps are added to the pending are scheduled when the assigned maps plus 
scheduled map less than max number of map. Reduces are
+  added to the pending and are ramped up (added to scheduled) based
+  on completed maps and current availability in the cluster as well as the max 
number of reduce.
   */
-  
+  //maps which are not yet scheduled
+  private final LinkedList<ContainerRequestEvent> pendingMaps =
+    new LinkedList<ContainerRequestEvent>();
   //reduces which are not yet scheduled
   private final LinkedList<ContainerRequest> pendingReduces = 
     new LinkedList<ContainerRequest>();
@@ -176,6 +178,14 @@ protected void serviceInit(Configuration conf) throws 
Exception {
     // Init startTime to current time. If all goes well, it will be reset after
     // first attempt to contact RM.
     retrystartTime = System.currentTimeMillis();
+    scheduleStats.numMaxMaps = conf.getInt(MRJobConfig.MR_MAP_NUM_MAX, 
Integer.MAX_VALUE);
+    if (scheduleStats.numMaxMaps <= 0) {
+      scheduleStats.numMaxMaps = Integer.MAX_VALUE;
+    }
+    scheduleStats.numMaxReduces = conf.getInt(MRJobConfig.MR_REDUCE_NUM_MAX, 
Integer.MAX_VALUE);
+    if (scheduleStats.numMaxReduces <= 0) {
+      scheduleStats.numMaxReduces = Integer.MAX_VALUE;
+    }
   }
 
   @Override
@@ -216,6 +226,7 @@ public void run() {
 
   @Override
   protected synchronized void heartbeat() throws Exception {
+    scheduleMaps();
     scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
     List<Container> allocatedContainers = getResources();
     if (allocatedContainers.size() > 0) {
@@ -233,12 +244,13 @@ protected synchronized void heartbeat() throws Exception {
     if (recalculateReduceSchedule) {
       preemptReducesIfNeeded();
       scheduleReduces(
-          getJob().getTotalMaps(), completedMaps,
-          scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
-          assignedRequests.maps.size(), assignedRequests.reduces.size(),
-          mapResourceReqt, reduceResourceReqt,
-          pendingReduces.size(), 
-          maxReduceRampupLimit, reduceSlowStart);
+              getJob().getTotalMaps(), completedMaps,
+              scheduledRequests.maps.size() + pendingMaps.size(),
+              scheduledRequests.reduces.size(),
+              assignedRequests.maps.size(), assignedRequests.reduces.size(),
+              mapResourceReqt, reduceResourceReqt,
+              pendingReduces.size(),
+              maxReduceRampupLimit, reduceSlowStart);
       recalculateReduceSchedule = false;
     }
 
@@ -313,7 +325,14 @@ protected synchronized void 
handleEvent(ContainerAllocatorEvent event) {
         //set the rounded off memory
         reqEvent.getCapability().setMemory(mapResourceReqt.getMemory());
         
reqEvent.getCapability().setVirtualCores(mapResourceReqt.getVirtualCores());
-        scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
+        //scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
+        //将map请求添加到pending队列,然后进行调度
+        if (reqEvent.getEarlierAttemptFailed()) {
+          pendingMaps.addFirst(reqEvent);
+        } else {
+          pendingMaps.add(reqEvent);
+        }
+        scheduleMaps();
       } else {
         if (reduceResourceReqt.equals(Resources.none())) {
           reduceResourceReqt = reqEvent.getCapability();
@@ -375,6 +394,16 @@ protected synchronized void 
handleEvent(ContainerAllocatorEvent event) {
     }
   }
 
+  private void scheduleMaps() {
+    LOG.info("scheduling maps from pending queue");
+    //more map to be scheduled
+    int num = scheduleStats.numMaxMaps - scheduledRequests.maps.size() - 
assignedRequests.maps.size();
+    num = Math.min(num, pendingMaps.size());
+    for (int i = 0; i < num; i++) {
+      ContainerRequestEvent request = pendingMaps.removeFirst();
+      scheduledRequests.addMap(request);
+    }
+  }
   private static String getHost(String contMgrAddress) {
     String host = contMgrAddress;
     String[] hostport = host.split(":");
@@ -433,12 +462,12 @@ private void preemptReducesIfNeeded() {
   @Private
   public void scheduleReduces(
       int totalMaps, int completedMaps,
-      int scheduledMaps, int scheduledReduces,
+      int scheduledAndPendingMaps, int scheduledReduces,
       int assignedMaps, int assignedReduces,
       Resource mapResourceReqt, Resource reduceResourceReqt,
       int numPendingReduces,
       float maxReduceRampupLimit, float reduceSlowStart) {
-    
+
     if (numPendingReduces == 0) {
       return;
     }
@@ -465,7 +494,7 @@ public void scheduleReduces(
     
     //if all maps are assigned, then ramp up all reduces irrespective of the
     //headroom
-    if (scheduledMaps == 0 && numPendingReduces > 0) {
+    if (scheduledAndPendingMaps == 0 && numPendingReduces > 0) {
       LOG.info("All maps assigned. " +
           "Ramping up all remaining reduces:" + numPendingReduces);
       scheduleAllReduces();
@@ -478,9 +507,9 @@ public void scheduleReduces(
     } else {
       completedMapPercent = 1;
     }
-    
-    Resource netScheduledMapResource = 
-        Resources.multiply(mapResourceReqt, (scheduledMaps + assignedMaps));
+
+    Resource netScheduledMapResource =
+            Resources.multiply(mapResourceReqt, (scheduledAndPendingMaps + 
assignedMaps));
 
     Resource netScheduledReduceResource = 
         Resources.multiply(reduceResourceReqt, (scheduledReduces + 
assignedReduces));
@@ -499,8 +528,8 @@ public void scheduleReduces(
     // check if there aren't enough maps scheduled, give the free map capacity
     // to reduce. 
     // Even when container number equals, there may be unused resources in one 
dimension
-    if (Resources.computeAvailableContainers(ideaMapResourceLimit, 
mapResourceReqt) 
-        >= (scheduledMaps + assignedMaps)) {
+    if (Resources.computeAvailableContainers(ideaMapResourceLimit, 
mapResourceReqt)
+            >= (scheduledAndPendingMaps + assignedMaps)) {
       // enough resource given to maps, given the remaining to reduces
       Resource unusedMapResourceLimit = 
Resources.subtract(ideaMapResourceLimit, netScheduledMapResource);
       finalReduceResourceLimit = Resources.add(idealReduceResourceLimit, 
unusedMapResourceLimit);
@@ -524,7 +553,7 @@ public void scheduleReduces(
     
     if (rampUp > 0) {
       rampUp = Math.min(rampUp, numPendingReduces);
-      LOG.info("Ramping up " + rampUp);
+
       rampUpReduces(rampUp);
     } else if (rampUp < 0){
       int rampDown = -1 * rampUp;
@@ -536,15 +565,18 @@ public void scheduleReduces(
 
   @Private
   public void scheduleAllReduces() {
-    for (ContainerRequest req : pendingReduces) {
-      scheduledRequests.addReduce(req);
-    }
-    pendingReduces.clear();
+//    for (ContainerRequest req : pendingReduces) {
+//      scheduledRequests.addReduce(req);
+//    }
+//    pendingReduces.clear();
+    rampUpReduces(Integer.MAX_VALUE);
   }
   
   @Private
   public void rampUpReduces(int rampUp) {
     //more reduce to be scheduled
+    rampUp = Math.min(rampUp, scheduleStats.numMaxReduces - 
scheduledRequests.reduces.size()-assignedRequests.reduces.size());
+    LOG.info("Ramping up " + rampUp);
     for (int i = 0; i < rampUp; i++) {
       ContainerRequest request = pendingReduces.removeFirst();
       scheduledRequests.addReduce(request);
@@ -1202,6 +1234,9 @@ ContainerId get(TaskAttemptId tId) {
   }
 
   private class ScheduleStats {
+    int numMaxMaps;
+    int numMaxReduces;
+    int numPendingMaps;
     int numPendingReduces;
     int numScheduledMaps;
     int numScheduledReduces;
@@ -1217,6 +1252,8 @@ public void updateAndLogIfChanged(String msgPrefix) {
 
       // synchronized to fix findbug warnings
       synchronized (RMContainerAllocator.this) {
+        changed |= (numPendingMaps != pendingMaps.size());
+        numPendingMaps = pendingMaps.size();
         changed |= (numPendingReduces != pendingReduces.size());
         numPendingReduces = pendingReduces.size();
         changed |= (numScheduledMaps != scheduledRequests.maps.size());
@@ -1243,7 +1280,8 @@ public void updateAndLogIfChanged(String msgPrefix) {
     }
 
     public void log(String msgPrefix) {
-        LOG.info(msgPrefix + "PendingReds:" + numPendingReduces +
+        LOG.info(msgPrefix + " PendingMaps: " + numPendingMaps +
+        " PendingReds:" + numPendingReduces +
         " ScheduledMaps:" + numScheduledMaps +
         " ScheduledReds:" + numScheduledReduces +
         " AssignedMaps:" + numAssignedMaps +
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index fe96c52..66aa3dd 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -738,5 +738,11 @@
   public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 2;
   
   public static final String MR_APPLICATION_TYPE = "MAPREDUCE";
-  
+
+  /**
+   * limit the number of maps and reduces
+   */
+  public static final String MR_MAP_NUM_MAX = "mapreduce.map.num.max";
+
+  public static final String MR_REDUCE_NUM_MAX = "mapreduce.reduce.num.max";
 }

              Status: Patch Available  (was: Open)

> Users should limit the number of an application
> -----------------------------------------------
>
>                 Key: MAPREDUCE-6176
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6176
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: mr-am, mrv2
>    Affects Versions: 2.5.2, 2.5.1, 2.4.1, 2.5.0
>            Reporter: Yang Hao
>            Assignee: Yang Hao
>              Labels: patch
>             Fix For: 2.4.1
>
>
> As MapReduce is batch framework of calculation, so people may want to run 
> application A as well as application B . A good way to do so is that we can 
> limit the number of application's map task or reduce task. If we set 
> mapreduce.map.num.max as M, then the map task number will not exceed M. At 
> the same time, if we set mapreduce.map.num.max as R, then the reduce task 
> number will not exceed R



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to