Author: wang Date: Thu Jun 19 18:32:13 2014 New Revision: 1603993 URL: http://svn.apache.org/r1603993 Log: Merge from trunk to branch
Added: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ - copied from r1603978, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java - copied unchanged from r1603978, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java - copied unchanged from r1603978, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java Removed: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1603664-1603978 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt Thu Jun 19 18:32:13 2014 @@ -213,6 +213,12 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid occassional failures. (Mit Desai via vinodkv) + MAPREDUCE-5896. InputSplits should indicate which locations have the block + cached in memory. (Sandy Ryza via kasha) + + MAPREDUCE-5844. Add a configurable delay to reducer-preemption. + (Maysam Yabandeh via kasha) + OPTIMIZATIONS BUG FIXES Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1603664-1603978 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Jun 19 18:32:13 2014 @@ -475,8 +475,8 @@ <Match> <Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" /> <Or> - <Field name="mapResourceReqt" /> - <Field name="reduceResourceReqt" /> + <Field name="mapResourceRequest" /> + <Field name="reduceResourceRequest" /> <Field name="maxReduceRampupLimit" /> <Field name="reduceSlowStart" /> </Or> Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jun 19 18:32:13 2014 @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -143,15 +144,21 @@ public class RMContainerAllocator extend private int lastCompletedTasks = 0; private boolean recalculateReduceSchedule = false; - private int mapResourceReqt;//memory - private int reduceResourceReqt;//memory + private int mapResourceRequest;//memory + private int reduceResourceRequest;//memory private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; + /** + * after this threshold, if the container request is not allocated, it is + * considered delayed. + */ + private long allocationDelayThresholdMs = 0; private float reduceSlowStart = 0; private long retryInterval; private long retrystartTime; + private Clock clock; private final AMPreemptionPolicy preemptionPolicy; @@ -166,6 +173,7 @@ public class RMContainerAllocator extend super(clientService, context); this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); + this.clock = context.getClock(); } @Override @@ -180,6 +188,9 @@ public class RMContainerAllocator extend maxReducePreemptionLimit = conf.getFloat( MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); + allocationDelayThresholdMs = conf.getInt( + MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms RackResolver.init(conf); retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); @@ -246,7 +257,7 @@ public class RMContainerAllocator extend getJob().getTotalMaps(), completedMaps, scheduledRequests.maps.size(), scheduledRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(), - mapResourceReqt, reduceResourceReqt, + mapResourceRequest, reduceResourceRequest, pendingReduces.size(), maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; @@ -268,6 +279,18 @@ public class RMContainerAllocator extend scheduleStats.log("Final Stats: "); } + @Private + @VisibleForTesting + AssignedRequests getAssignedRequests() { + return assignedRequests; + } + + @Private + @VisibleForTesting + ScheduledRequests getScheduledRequests() { + return scheduledRequests; + } + public boolean getIsReduceStarted() { return reduceStarted; } @@ -303,16 +326,16 @@ public class RMContainerAllocator extend int supportedMaxContainerCapability = getMaxContainerCapability().getMemory(); if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { - if (mapResourceReqt == 0) { - mapResourceReqt = reqEvent.getCapability().getMemory(); + if (mapResourceRequest == 0) { + mapResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, - mapResourceReqt))); - LOG.info("mapResourceReqt:"+mapResourceReqt); - if (mapResourceReqt > supportedMaxContainerCapability) { + mapResourceRequest))); + LOG.info("mapResourceRequest:"+ mapResourceRequest); + if (mapResourceRequest > supportedMaxContainerCapability) { String diagMsg = "MAP capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. mapResourceReqt: " + - mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability; + "max container capability in the cluster. Killing the Job. mapResourceRequest: " + + mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( jobId, diagMsg)); @@ -320,20 +343,20 @@ public class RMContainerAllocator extend } } //set the rounded off memory - reqEvent.getCapability().setMemory(mapResourceReqt); + reqEvent.getCapability().setMemory(mapResourceRequest); scheduledRequests.addMap(reqEvent);//maps are immediately scheduled } else { - if (reduceResourceReqt == 0) { - reduceResourceReqt = reqEvent.getCapability().getMemory(); + if (reduceResourceRequest == 0) { + reduceResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, - reduceResourceReqt))); - LOG.info("reduceResourceReqt:"+reduceResourceReqt); - if (reduceResourceReqt > supportedMaxContainerCapability) { + reduceResourceRequest))); + LOG.info("reduceResourceRequest:"+ reduceResourceRequest); + if (reduceResourceRequest > supportedMaxContainerCapability) { String diagMsg = "REDUCE capability required is more than the " + "supported max container capability in the cluster. Killing the " + - "Job. reduceResourceReqt: " + reduceResourceReqt + + "Job. reduceResourceRequest: " + reduceResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( @@ -342,7 +365,7 @@ public class RMContainerAllocator extend } } //set the rounded off memory - reqEvent.getCapability().setMemory(reduceResourceReqt); + reqEvent.getCapability().setMemory(reduceResourceRequest); if (reqEvent.getEarlierAttemptFailed()) { //add to the front of queue for fail fast pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); @@ -394,8 +417,22 @@ public class RMContainerAllocator extend return host; } - private void preemptReducesIfNeeded() { - if (reduceResourceReqt == 0) { + @Private + @VisibleForTesting + synchronized void setReduceResourceRequest(int mem) { + this.reduceResourceRequest = mem; + } + + @Private + @VisibleForTesting + synchronized void setMapResourceRequest(int mem) { + this.mapResourceRequest = mem; + } + + @Private + @VisibleForTesting + void preemptReducesIfNeeded() { + if (reduceResourceRequest == 0) { return; //no reduces } //check if reduces have taken over the whole cluster and there are @@ -403,9 +440,9 @@ public class RMContainerAllocator extend if (scheduledRequests.maps.size() > 0) { int memLimit = getMemLimit(); int availableMemForMap = memLimit - ((assignedRequests.reduces.size() - - assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt); + assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest); //availableMemForMap must be sufficient to run atleast 1 map - if (availableMemForMap < mapResourceReqt) { + if (availableMemForMap < mapResourceRequest) { //to make sure new containers are given to maps and not reduces //ramp down all scheduled reduces if any //(since reduces are scheduled at higher priority than maps) @@ -414,22 +451,40 @@ public class RMContainerAllocator extend pendingReduces.add(req); } scheduledRequests.reduces.clear(); - - //preempt for making space for at least one map - int premeptionLimit = Math.max(mapResourceReqt, - (int) (maxReducePreemptionLimit * memLimit)); - - int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, - premeptionLimit); - - int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); - toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - - LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); - assignedRequests.preemptReduce(toPreempt); + + //do further checking to find the number of map requests that were + //hanging around for a while + int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); + if (hangingMapRequests > 0) { + //preempt for making space for at least one map + int premeptionLimit = Math.max(mapResourceRequest, + (int) (maxReducePreemptionLimit * memLimit)); + + int preemptMem = Math.min(hangingMapRequests * mapResourceRequest, + premeptionLimit); + + int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest); + toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); + + LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); + assignedRequests.preemptReduce(toPreempt); + } } } } + + private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) { + if (allocationDelayThresholdMs <= 0) + return requestMap.size(); + int hangingRequests = 0; + long currTime = clock.getTime(); + for (ContainerRequest request: requestMap.values()) { + long delay = currTime - request.requestTimeMs; + if (delay > allocationDelayThresholdMs) + hangingRequests++; + } + return hangingRequests; + } @Private public void scheduleReduces( @@ -715,11 +770,13 @@ public class RMContainerAllocator extend @Private public int getMemLimit() { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; - return headRoom + assignedRequests.maps.size() * mapResourceReqt + - assignedRequests.reduces.size() * reduceResourceReqt; + return headRoom + assignedRequests.maps.size() * mapResourceRequest + + assignedRequests.reduces.size() * reduceResourceRequest; } - - private class ScheduledRequests { + + @Private + @VisibleForTesting + class ScheduledRequests { private final LinkedList<TaskAttemptId> earlierFailedMaps = new LinkedList<TaskAttemptId>(); @@ -729,7 +786,8 @@ public class RMContainerAllocator extend new HashMap<String, LinkedList<TaskAttemptId>>(); private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = new HashMap<String, LinkedList<TaskAttemptId>>(); - private final Map<TaskAttemptId, ContainerRequest> maps = + @VisibleForTesting + final Map<TaskAttemptId, ContainerRequest> maps = new LinkedHashMap<TaskAttemptId, ContainerRequest>(); private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = @@ -825,22 +883,22 @@ public class RMContainerAllocator extend int allocatedMemory = allocated.getResource().getMemory(); if (PRIORITY_FAST_FAIL_MAP.equals(priority) || PRIORITY_MAP.equals(priority)) { - if (allocatedMemory < mapResourceReqt + if (allocatedMemory < mapResourceRequest || maps.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a map as either " - + " container memory less than required " + mapResourceReqt + + " container memory less than required " + mapResourceRequest + " or no pending map tasks - maps.isEmpty=" + maps.isEmpty()); isAssignable = false; } } else if (PRIORITY_REDUCE.equals(priority)) { - if (allocatedMemory < reduceResourceReqt + if (allocatedMemory < reduceResourceRequest || reduces.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a reduce as either " - + " container memory less than required " + reduceResourceReqt + + " container memory less than required " + reduceResourceRequest + " or no pending reduce tasks - reduces.isEmpty=" + reduces.isEmpty()); isAssignable = false; @@ -1119,14 +1177,18 @@ public class RMContainerAllocator extend } } - private class AssignedRequests { + @Private + @VisibleForTesting + class AssignedRequests { private final Map<ContainerId, TaskAttemptId> containerToAttemptMap = new HashMap<ContainerId, TaskAttemptId>(); private final LinkedHashMap<TaskAttemptId, Container> maps = new LinkedHashMap<TaskAttemptId, Container>(); - private final LinkedHashMap<TaskAttemptId, Container> reduces = + @VisibleForTesting + final LinkedHashMap<TaskAttemptId, Container> reduces = new LinkedHashMap<TaskAttemptId, Container>(); - private final Set<TaskAttemptId> preemptionWaitingReduces = + @VisibleForTesting + final Set<TaskAttemptId> preemptionWaitingReduces = new HashSet<TaskAttemptId>(); void add(Container container, TaskAttemptId tId) { Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu Jun 19 18:32:13 2014 @@ -29,8 +29,10 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -96,6 +98,8 @@ public abstract class RMContainerRequest super(clientService, context); } + @Private + @VisibleForTesting static class ContainerRequest { final TaskAttemptId attemptID; final Resource capability; @@ -103,20 +107,39 @@ public abstract class RMContainerRequest final String[] racks; //final boolean earlierAttemptFailed; final Priority priority; - + /** + * the time when this request object was formed; can be used to avoid + * aggressive preemption for recently placed requests + */ + final long requestTimeMs; + public ContainerRequest(ContainerRequestEvent event, Priority priority) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), event.getRacks(), priority); } - + + public ContainerRequest(ContainerRequestEvent event, Priority priority, + long requestTimeMs) { + this(event.getAttemptID(), event.getCapability(), event.getHosts(), + event.getRacks(), priority, requestTimeMs); + } + + public ContainerRequest(TaskAttemptId attemptID, + Resource capability, String[] hosts, String[] racks, + Priority priority) { + this(attemptID, capability, hosts, racks, priority, + System.currentTimeMillis()); + } + public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority) { + Resource capability, String[] hosts, String[] racks, + Priority priority, long requestTimeMs) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; this.racks = racks; this.priority = priority; + this.requestTimeMs = requestTimeMs; } public String toString() { Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Jun 19 18:32:13 2014 @@ -295,6 +295,15 @@ public abstract class FileInputFormat<K, String[] hosts) { return new FileSplit(file, start, length, hosts); } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return new FileSplit(file, start, length, hosts, inMemoryHosts); + } /** Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ @@ -337,22 +346,22 @@ public abstract class FileInputFormat<K, long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts)); + splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { - String[] splitHosts = getSplitHosts(blkLocations, length + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts)); + splitHosts[0], splitHosts[1])); } } else { - String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts)); + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //Create empty hosts array for zero length files @@ -538,10 +547,30 @@ public abstract class FileInputFormat<K, * @param blkLocations The list of block locations * @param offset * @param splitSize - * @return array of hosts that contribute most to this split + * @return an array of hosts that contribute most to this split * @throws IOException */ protected String[] getSplitHosts(BlockLocation[] blkLocations, + long offset, long splitSize, NetworkTopology clusterMap) throws IOException { + return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, + clusterMap)[0]; + } + + /** + * This function identifies and returns the hosts that contribute + * most for a given split. For calculating the contribution, rack + * locality is treated on par with host locality, so hosts from racks + * that contribute the most are preferred over hosts on racks that + * contribute less + * @param blkLocations The list of block locations + * @param offset + * @param splitSize + * @return two arrays - one of hosts that contribute most to this split, and + * one of hosts that contribute most to this split that have the data + * cached on them + * @throws IOException + */ + private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, long offset, long splitSize, NetworkTopology clusterMap) throws IOException { @@ -552,7 +581,8 @@ public abstract class FileInputFormat<K, //If this is the only block, just return if (bytesInThisBlock >= splitSize) { - return blkLocations[startIndex].getHosts(); + return new String[][] { blkLocations[startIndex].getHosts(), + blkLocations[startIndex].getCachedHosts() }; } long bytesInFirstBlock = bytesInThisBlock; @@ -639,7 +669,9 @@ public abstract class FileInputFormat<K, } // for all indices - return identifyHosts(allTopos.length, racksMap); + // We don't yet support cached hosts when bytesInThisBlock > splitSize + return new String[][] { identifyHosts(allTopos.length, racksMap), + new String[0]}; } private String[] identifyHosts(int replicationFactor, Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java Thu Jun 19 18:32:13 2014 @@ -24,6 +24,7 @@ import java.io.DataOutput; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; /** A section of an input file. Returned by {@link @@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path; @InterfaceAudience.Public @InterfaceStability.Stable public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit - implements InputSplit { + implements InputSplitWithLocationInfo { org.apache.hadoop.mapreduce.lib.input.FileSplit fs; protected FileSplit() { fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(); @@ -62,6 +63,20 @@ public class FileSplit extends org.apach length, hosts); } + /** Constructs a split with host information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block, possibly null + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start, + length, hosts, inMemoryHosts); + } + public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) { this.fs = fs; } @@ -92,4 +107,9 @@ public class FileSplit extends org.apach return fs.getLocations(); } + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return fs.getLocationInfo(); + } } Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java Thu Jun 19 18:32:13 2014 @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; @@ -51,10 +53,25 @@ public abstract class InputSplit { /** * Get the list of nodes by name where the data for the split would be local. * The locations do not need to be serialized. + * * @return a new array of the node nodes. * @throws IOException * @throws InterruptedException */ public abstract String[] getLocations() throws IOException, InterruptedException; + + /** + * Gets info about which nodes the input split is stored on and how it is + * stored at each location. + * + * @return list of <code>SplitLocationInfo</code>s describing how the split + * data is stored at each location. A null value indicates that all the + * locations have the data stored on disk. + * @throws IOException + */ + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return null; + } } \ No newline at end of file Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Jun 19 18:32:13 2014 @@ -579,7 +579,17 @@ public interface MRJobConfig { MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold"; public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = 50; - + + /** + * The threshold in terms of seconds after which an unsatisfied mapper request + * triggers reducer preemption to free space. Default 0 implies that the reduces + * should be preempted immediately after allocation if there is currently no + * room for newly allocated mappers. + */ + public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC = + "mapreduce.job.reducer.preempt.delay.sec"; + public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0; + public static final String MR_AM_ENV = MR_AM_PREFIX + "env"; Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Thu Jun 19 18:32:13 2014 @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapred.LocatedFileStatusFetcher; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -359,6 +360,15 @@ public abstract class FileInputFormat<K, String[] hosts) { return new FileSplit(file, start, length, hosts); } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return new FileSplit(file, start, length, hosts, inMemoryHosts); + } /** * Generate the list of files and make them into FileSplits. @@ -392,17 +402,20 @@ public abstract class FileInputFormat<K, while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable - splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); + splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), + blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Thu Jun 19 18:32:13 2014 @@ -22,11 +22,13 @@ import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -41,6 +43,7 @@ public class FileSplit extends InputSpli private long start; private long length; private String[] hosts; + private SplitLocationInfo[] hostInfos; public FileSplit() {} @@ -57,6 +60,31 @@ public class FileSplit extends InputSpli this.length = length; this.hosts = hosts; } + + /** Constructs a split with host and cached-blocks information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + this(file, start, length, hosts); + hostInfos = new SplitLocationInfo[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + // because N will be tiny, scanning is probably faster than a HashSet + boolean inMemory = false; + for (String inMemoryHost : inMemoryHosts) { + if (inMemoryHost.equals(hosts[i])) { + inMemory = true; + break; + } + } + hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory); + } + } /** The file containing this split's data. */ public Path getPath() { return file; } @@ -98,4 +126,10 @@ public class FileSplit extends InputSpli return this.hosts; } } + + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return hostInfos; + } } Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jun 19 18:32:13 2014 @@ -83,6 +83,16 @@ </property> <property> + <name>mapreduce.job.reducer.preempt.delay.sec</name> + <value>0</value> + <description>The threshold in terms of seconds after which an unsatisfied mapper + request triggers reducer preemption to free space. Default 0 implies that the + reduces should be preempted immediately after allocation if there is currently no + room for newly allocated mappers. + </description> +</property> + +<property> <name>mapreduce.job.max.split.locations</name> <value>10</value> <description>The max number of block locations to store for each split for Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1602934-1603978 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Thu Jun 19 18:32:13 2014 @@ -103,6 +103,29 @@ public class TestFileInputFormat { } @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1); + String[] locations = splits[0].getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits[0].getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } + + @Test public void testListStatusSimple() throws IOException { Configuration conf = new Configuration(); conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); @@ -223,8 +246,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1603993&r1=1603992&r2=1603993&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Thu Jun 19 18:32:13 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.junit.After; @@ -139,6 +140,28 @@ public class TestFileInputFormat { 1, mockFs.numListLocatedStatusCalls); FileSystem.closeAll(); } + + @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + Job job = Job.getInstance(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + List<InputSplit> splits = fileInputFormat.getSplits(job); + String[] locations = splits.get(0).getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } @Test public void testListStatusSimple() throws IOException { @@ -402,9 +425,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; - } + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,