[
https://issues.apache.org/jira/browse/MAPREDUCE-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yang Hao updated MAPREDUCE-6176:
--------------------------------
Release Note: (was: diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 7c18f06..6f69168 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -111,16 +111,18 @@
completed -> request corresponding to which container has completed
Lifecycle of map
- scheduled->assigned->completed
-
+ pending->scheduled->assigned->completed
+
Lifecycle of reduce
pending->scheduled->assigned->completed
-
- Maps are scheduled as soon as their requests are received. Reduces are
- added to the pending and are ramped up (added to scheduled) based
- on completed maps and current availability in the cluster.
+
+ Maps are added to the pending are scheduled when the assigned maps plus
scheduled map less than max number of map. Reduces are
+ added to the pending and are ramped up (added to scheduled) based
+ on completed maps and current availability in the cluster as well as the max
number of reduce.
*/
-
+ //maps which are not yet scheduled
+ private final LinkedList<ContainerRequestEvent> pendingMaps =
+ new LinkedList<ContainerRequestEvent>();
//reduces which are not yet scheduled
private final LinkedList<ContainerRequest> pendingReduces =
new LinkedList<ContainerRequest>();
@@ -176,6 +178,14 @@ protected void serviceInit(Configuration conf) throws
Exception {
// Init startTime to current time. If all goes well, it will be reset after
// first attempt to contact RM.
retrystartTime = System.currentTimeMillis();
+ scheduleStats.numMaxMaps = conf.getInt(MRJobConfig.MR_MAP_NUM_MAX,
Integer.MAX_VALUE);
+ if (scheduleStats.numMaxMaps <= 0) {
+ scheduleStats.numMaxMaps = Integer.MAX_VALUE;
+ }
+ scheduleStats.numMaxReduces = conf.getInt(MRJobConfig.MR_REDUCE_NUM_MAX,
Integer.MAX_VALUE);
+ if (scheduleStats.numMaxReduces <= 0) {
+ scheduleStats.numMaxReduces = Integer.MAX_VALUE;
+ }
}
@Override
@@ -216,6 +226,7 @@ public void run() {
@Override
protected synchronized void heartbeat() throws Exception {
+ scheduleMaps();
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources();
if (allocatedContainers.size() > 0) {
@@ -233,12 +244,13 @@ protected synchronized void heartbeat() throws Exception {
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces(
- getJob().getTotalMaps(), completedMaps,
- scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
- assignedRequests.maps.size(), assignedRequests.reduces.size(),
- mapResourceReqt, reduceResourceReqt,
- pendingReduces.size(),
- maxReduceRampupLimit, reduceSlowStart);
+ getJob().getTotalMaps(), completedMaps,
+ scheduledRequests.maps.size() + pendingMaps.size(),
+ scheduledRequests.reduces.size(),
+ assignedRequests.maps.size(), assignedRequests.reduces.size(),
+ mapResourceReqt, reduceResourceReqt,
+ pendingReduces.size(),
+ maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
}
@@ -313,7 +325,14 @@ protected synchronized void
handleEvent(ContainerAllocatorEvent event) {
//set the rounded off memory
reqEvent.getCapability().setMemory(mapResourceReqt.getMemory());
reqEvent.getCapability().setVirtualCores(mapResourceReqt.getVirtualCores());
- scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
+ //scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
+ //将map请求添加到pending队列,然后进行调度
+ if (reqEvent.getEarlierAttemptFailed()) {
+ pendingMaps.addFirst(reqEvent);
+ } else {
+ pendingMaps.add(reqEvent);
+ }
+ scheduleMaps();
} else {
if (reduceResourceReqt.equals(Resources.none())) {
reduceResourceReqt = reqEvent.getCapability();
@@ -375,6 +394,16 @@ protected synchronized void
handleEvent(ContainerAllocatorEvent event) {
}
}
+ private void scheduleMaps() {
+ LOG.info("scheduling maps from pending queue");
+ //more map to be scheduled
+ int num = scheduleStats.numMaxMaps - scheduledRequests.maps.size() -
assignedRequests.maps.size();
+ num = Math.min(num, pendingMaps.size());
+ for (int i = 0; i < num; i++) {
+ ContainerRequestEvent request = pendingMaps.removeFirst();
+ scheduledRequests.addMap(request);
+ }
+ }
private static String getHost(String contMgrAddress) {
String host = contMgrAddress;
String[] hostport = host.split(":");
@@ -433,12 +462,12 @@ private void preemptReducesIfNeeded() {
@Private
public void scheduleReduces(
int totalMaps, int completedMaps,
- int scheduledMaps, int scheduledReduces,
+ int scheduledAndPendingMaps, int scheduledReduces,
int assignedMaps, int assignedReduces,
Resource mapResourceReqt, Resource reduceResourceReqt,
int numPendingReduces,
float maxReduceRampupLimit, float reduceSlowStart) {
-
+
if (numPendingReduces == 0) {
return;
}
@@ -465,7 +494,7 @@ public void scheduleReduces(
//if all maps are assigned, then ramp up all reduces irrespective of the
//headroom
- if (scheduledMaps == 0 && numPendingReduces > 0) {
+ if (scheduledAndPendingMaps == 0 && numPendingReduces > 0) {
LOG.info("All maps assigned. " +
"Ramping up all remaining reduces:" + numPendingReduces);
scheduleAllReduces();
@@ -478,9 +507,9 @@ public void scheduleReduces(
} else {
completedMapPercent = 1;
}
-
- Resource netScheduledMapResource =
- Resources.multiply(mapResourceReqt, (scheduledMaps + assignedMaps));
+
+ Resource netScheduledMapResource =
+ Resources.multiply(mapResourceReqt, (scheduledAndPendingMaps +
assignedMaps));
Resource netScheduledReduceResource =
Resources.multiply(reduceResourceReqt, (scheduledReduces +
assignedReduces));
@@ -499,8 +528,8 @@ public void scheduleReduces(
// check if there aren't enough maps scheduled, give the free map capacity
// to reduce.
// Even when container number equals, there may be unused resources in one
dimension
- if (Resources.computeAvailableContainers(ideaMapResourceLimit,
mapResourceReqt)
- >= (scheduledMaps + assignedMaps)) {
+ if (Resources.computeAvailableContainers(ideaMapResourceLimit,
mapResourceReqt)
+ >= (scheduledAndPendingMaps + assignedMaps)) {
// enough resource given to maps, given the remaining to reduces
Resource unusedMapResourceLimit =
Resources.subtract(ideaMapResourceLimit, netScheduledMapResource);
finalReduceResourceLimit = Resources.add(idealReduceResourceLimit,
unusedMapResourceLimit);
@@ -524,7 +553,7 @@ public void scheduleReduces(
if (rampUp > 0) {
rampUp = Math.min(rampUp, numPendingReduces);
- LOG.info("Ramping up " + rampUp);
+
rampUpReduces(rampUp);
} else if (rampUp < 0){
int rampDown = -1 * rampUp;
@@ -536,15 +565,18 @@ public void scheduleReduces(
@Private
public void scheduleAllReduces() {
- for (ContainerRequest req : pendingReduces) {
- scheduledRequests.addReduce(req);
- }
- pendingReduces.clear();
+// for (ContainerRequest req : pendingReduces) {
+// scheduledRequests.addReduce(req);
+// }
+// pendingReduces.clear();
+ rampUpReduces(Integer.MAX_VALUE);
}
@Private
public void rampUpReduces(int rampUp) {
//more reduce to be scheduled
+ rampUp = Math.min(rampUp, scheduleStats.numMaxReduces -
scheduledRequests.reduces.size()-assignedRequests.reduces.size());
+ LOG.info("Ramping up " + rampUp);
for (int i = 0; i < rampUp; i++) {
ContainerRequest request = pendingReduces.removeFirst();
scheduledRequests.addReduce(request);
@@ -1202,6 +1234,9 @@ ContainerId get(TaskAttemptId tId) {
}
private class ScheduleStats {
+ int numMaxMaps;
+ int numMaxReduces;
+ int numPendingMaps;
int numPendingReduces;
int numScheduledMaps;
int numScheduledReduces;
@@ -1217,6 +1252,8 @@ public void updateAndLogIfChanged(String msgPrefix) {
// synchronized to fix findbug warnings
synchronized (RMContainerAllocator.this) {
+ changed |= (numPendingMaps != pendingMaps.size());
+ numPendingMaps = pendingMaps.size();
changed |= (numPendingReduces != pendingReduces.size());
numPendingReduces = pendingReduces.size();
changed |= (numScheduledMaps != scheduledRequests.maps.size());
@@ -1243,7 +1280,8 @@ public void updateAndLogIfChanged(String msgPrefix) {
}
public void log(String msgPrefix) {
- LOG.info(msgPrefix + "PendingReds:" + numPendingReduces +
+ LOG.info(msgPrefix + " PendingMaps: " + numPendingMaps +
+ " PendingReds:" + numPendingReduces +
" ScheduledMaps:" + numScheduledMaps +
" ScheduledReds:" + numScheduledReduces +
" AssignedMaps:" + numAssignedMaps +
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index fe96c52..66aa3dd 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -738,5 +738,11 @@
public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 2;
public static final String MR_APPLICATION_TYPE = "MAPREDUCE";
-
+
+ /**
+ * limit the number of maps and reduces
+ */
+ public static final String MR_MAP_NUM_MAX = "mapreduce.map.num.max";
+
+ public static final String MR_REDUCE_NUM_MAX = "mapreduce.reduce.num.max";
}
)
> Users should limit the number of an application
> -----------------------------------------------
>
> Key: MAPREDUCE-6176
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6176
> Project: Hadoop Map/Reduce
> Issue Type: New Feature
> Components: mr-am, mrv2
> Affects Versions: 2.5.0, 2.4.1, 2.5.1, 2.5.2
> Reporter: Yang Hao
> Assignee: Yang Hao
> Labels: patch
> Fix For: 2.4.1
>
>
> As MapReduce is batch framework of calculation, so people may want to run
> application A as well as application B . A good way to do so is that we can
> limit the number of application's map task or reduce task. If we set
> mapreduce.map.num.max as M, then the map task number will not exceed M. At
> the same time, if we set mapreduce.map.num.max as R, then the reduce task
> number will not exceed R
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)