Author: jlowe Date: Fri Jul 26 18:22:26 2013 New Revision: 1507388 URL: http://svn.apache.org/r1507388 Log: MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus. Contributed by Hairong Kuang and Jason Lowe
Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (with props) Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jul 26 18:22:26 2013 @@ -10,6 +10,9 @@ Release 0.23.10 - UNRELEASED OPTIMIZATIONS + MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus + (Hairong Kuang and Jason Lowe via jlowe) + BUG FIXES MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -36,8 +36,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -164,13 +166,17 @@ public abstract class FileInputFormat<K, protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { - for(FileStatus stat: fs.listStatus(path, inputFilter)) { - if (stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(stat); + } } - } + } } /** List input directories. @@ -216,14 +222,19 @@ public abstract class FileInputFormat<K, } else { for (FileStatus globStat: matches) { if (globStat.isDirectory()) { - for(FileStatus stat: fs.listStatus(globStat.getPath(), - inputFilter)) { - if (recursive && stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator<LocatedFileStatus> iter = + fs.listLocatedStatus(globStat.getPath()); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (recursive && stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), + inputFilter); + } else { + result.add(stat); + } } - } + } } else { result.add(globStat); } @@ -249,7 +260,6 @@ public abstract class FileInputFormat<K, /** Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ - @SuppressWarnings("deprecation") public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { FileStatus[] files = listStatus(job); @@ -273,31 +283,38 @@ public abstract class FileInputFormat<K, NetworkTopology clusterMap = new NetworkTopology(); for (FileStatus file: files) { Path path = file.getPath(); - FileSystem fs = path.getFileSystem(job); long length = file.getLen(); - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - if ((length != 0) && isSplitable(fs, path)) { - long blockSize = file.getBlockSize(); - long splitSize = computeSplitSize(goalSize, minSize, blockSize); - - long bytesRemaining = length; - while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, - length-bytesRemaining, splitSize, clusterMap); - splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts)); - bytesRemaining -= splitSize; + if (length != 0) { + FileSystem fs = path.getFileSystem(job); + BlockLocation[] blkLocations; + if (file instanceof LocatedFileStatus) { + blkLocations = ((LocatedFileStatus) file).getBlockLocations(); + } else { + blkLocations = fs.getFileBlockLocations(file, 0, length); + } + if (isSplitable(fs, path)) { + long blockSize = file.getBlockSize(); + long splitSize = computeSplitSize(goalSize, minSize, blockSize); + + long bytesRemaining = length; + while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { + String[] splitHosts = getSplitHosts(blkLocations, + length-bytesRemaining, splitSize, clusterMap); + splits.add(makeSplit(path, length-bytesRemaining, splitSize, + splitHosts)); + bytesRemaining -= splitSize; + } + + if (bytesRemaining != 0) { + String[] splitHosts = getSplitHosts(blkLocations, length + - bytesRemaining, bytesRemaining, clusterMap); + splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, + splitHosts)); + } + } else { + String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts)); } - - if (bytesRemaining != 0) { - String[] splitHosts = getSplitHosts(blkLocations, length - - bytesRemaining, bytesRemaining, clusterMap); - splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts)); - } - } else if (length != 0) { - String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts)); } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.lib. import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.HashSet; import java.util.List; import java.util.HashMap; @@ -33,7 +32,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -203,47 +202,33 @@ public abstract class CombineFileInputFo } // all the files in input set - Path[] paths = FileUtil.stat2Paths( - listStatus(job).toArray(new FileStatus[0])); + List<FileStatus> stats = listStatus(job); List<InputSplit> splits = new ArrayList<InputSplit>(); - if (paths.length == 0) { + if (stats.size() == 0) { return splits; } - // Convert them to Paths first. This is a costly operation and - // we should do it first, otherwise we will incur doing it multiple - // times, one time each for each pool in the next loop. - List<Path> newpaths = new LinkedList<Path>(); - for (int i = 0; i < paths.length; i++) { - FileSystem fs = paths[i].getFileSystem(conf); - Path p = fs.makeQualified(paths[i]); - newpaths.add(p); - } - paths = null; - // In one single iteration, process all the paths in a single pool. // Processing one pool at a time ensures that a split contains paths // from a single pool only. for (MultiPathFilter onepool : pools) { - ArrayList<Path> myPaths = new ArrayList<Path>(); + ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>(); // pick one input path. If it matches all the filters in a pool, // add it to the output set - for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) { - Path p = iter.next(); - if (onepool.accept(p)) { + for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) { + FileStatus p = iter.next(); + if (onepool.accept(p.getPath())) { myPaths.add(p); // add it to my output set iter.remove(); } } // create splits for all files in this pool. - getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), - maxSize, minSizeNode, minSizeRack, splits); + getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits); } // create splits for all files that are not in any pool. - getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), - maxSize, minSizeNode, minSizeRack, splits); + getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits); // free up rackToNodes map rackToNodes.clear(); @@ -253,7 +238,7 @@ public abstract class CombineFileInputFo /** * Return all the splits in the specified set of paths */ - private void getMoreSplits(JobContext job, Path[] paths, + private void getMoreSplits(JobContext job, List<FileStatus> stats, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) throws IOException { @@ -274,18 +259,20 @@ public abstract class CombineFileInputFo HashMap<String, List<OneBlockInfo>> nodeToBlocks = new HashMap<String, List<OneBlockInfo>>(); - files = new OneFileInfo[paths.length]; - if (paths.length == 0) { + files = new OneFileInfo[stats.size()]; + if (stats.size() == 0) { return; } // populate all the blocks for all files long totLength = 0; - for (int i = 0; i < paths.length; i++) { - files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]), + int fileIdx = 0; + for (FileStatus stat : stats) { + files[fileIdx] = new OneFileInfo(stat, conf, + isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes, maxSize); - totLength += files[i].getLength(); + totLength += files[fileIdx].getLength(); } ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>(); @@ -479,7 +466,7 @@ public abstract class CombineFileInputFo private long fileSize; // size of the file private OneBlockInfo[] blocks; // all blocks in this file - OneFileInfo(Path path, Configuration conf, + OneFileInfo(FileStatus stat, Configuration conf, boolean isSplitable, HashMap<String, List<OneBlockInfo>> rackToBlocks, HashMap<OneBlockInfo, String[]> blockToNodes, @@ -490,10 +477,13 @@ public abstract class CombineFileInputFo this.fileSize = 0; // get block locations from file system - FileSystem fs = path.getFileSystem(conf); - FileStatus stat = fs.getFileStatus(path); - BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, - stat.getLen()); + BlockLocation[] locations; + if (stat instanceof LocatedFileStatus) { + locations = ((LocatedFileStatus) stat).getBlockLocations(); + } else { + FileSystem fs = stat.getPath().getFileSystem(conf); + locations = fs.getFileBlockLocations(stat, 0, stat.getLen()); + } // create a list of all block and their locations if (locations == null) { blocks = new OneBlockInfo[0]; @@ -508,8 +498,8 @@ public abstract class CombineFileInputFo // full file length blocks = new OneBlockInfo[1]; fileSize = stat.getLen(); - blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0] - .getHosts(), locations[0].getTopologyPaths()); + blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize, + locations[0].getHosts(), locations[0].getTopologyPaths()); } else { ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>( locations.length); @@ -535,9 +525,9 @@ public abstract class CombineFileInputFo myLength = Math.min(maxSize, left); } } - OneBlockInfo oneblock = new OneBlockInfo(path, myOffset, - myLength, locations[i].getHosts(), locations[i] - .getTopologyPaths()); + OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(), + myOffset, myLength, locations[i].getHosts(), + locations[i].getTopologyPaths()); left -= myLength; myOffset += myLength; @@ -638,6 +628,9 @@ public abstract class CombineFileInputFo protected BlockLocation[] getFileBlockLocations( FileSystem fs, FileStatus stat) throws IOException { + if (stat instanceof LocatedFileStatus) { + return ((LocatedFileStatus) stat).getBlockLocations(); + } return fs.getFileBlockLocations(stat, 0, stat.getLen()); } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -29,9 +29,11 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -254,14 +256,19 @@ public abstract class FileInputFormat<K, } else { for (FileStatus globStat: matches) { if (globStat.isDirectory()) { - for(FileStatus stat: fs.listStatus(globStat.getPath(), - inputFilter)) { - if (recursive && stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator<LocatedFileStatus> iter = + fs.listLocatedStatus(globStat.getPath()); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (recursive && stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), + inputFilter); + } else { + result.add(stat); + } } - } + } } else { result.add(globStat); } @@ -291,13 +298,17 @@ public abstract class FileInputFormat<K, protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { - for(FileStatus stat: fs.listStatus(path, inputFilter)) { - if (stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(stat); + } } - } + } } @@ -326,8 +337,13 @@ public abstract class FileInputFormat<K, Path path = file.getPath(); long length = file.getLen(); if (length != 0) { - FileSystem fs = path.getFileSystem(job.getConfiguration()); - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + BlockLocation[] blkLocations; + if (file instanceof LocatedFileStatus) { + blkLocations = ((LocatedFileStatus) file).getBlockLocations(); + } else { + FileSystem fs = path.getFileSystem(job.getConfiguration()); + blkLocations = fs.getFileBlockLocations(file, 0, length); + } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1507388&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.RemoteIterator; +import org.junit.Assert; +import org.junit.Test; + +public class TestFileInputFormat { + + @Test + public void testListLocatedStatus() throws Exception { + Configuration conf = getConfiguration(); + conf.setBoolean("fs.test.impl.disable.cache", false); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + MockFileSystem mockFs = + (MockFileSystem) new Path("test:///").getFileSystem(conf); + Assert.assertEquals("listLocatedStatus already called", + 0, mockFs.numListLocatedStatusCalls); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + InputSplit[] splits = fileInputFormat.getSplits(job, 1); + Assert.assertEquals("Input splits are not correct", 2, splits.length); + Assert.assertEquals("listLocatedStatuss calls", + 1, mockFs.numListLocatedStatusCalls); + } + + private Configuration getConfiguration() { + Configuration conf = new Configuration(); + conf.set("fs.test.impl.disable.cache", "true"); + conf.setClass("fs.test.impl", MockFileSystem.class, FileSystem.class); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1"); + return conf; + } + + static class MockFileSystem extends RawLocalFileSystem { + int numListLocatedStatusCalls = 0; + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, + IOException { + if (f.toString().equals("test:/a1")) { + return new FileStatus[] { + new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")), + new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) }; + } else if (f.toString().equals("test:/a1/a2")) { + return new FileStatus[] { + new FileStatus(10, false, 1, 150, 150, + new Path("test:/a1/a2/file2")), + new FileStatus(10, false, 1, 151, 150, + new Path("test:/a1/a2/file3")) }; + } + return new FileStatus[0]; + } + + @Override + public FileStatus[] globStatus(Path pathPattern, PathFilter filter) + throws IOException { + return new FileStatus[] { new FileStatus(10, true, 1, 150, 150, + pathPattern) }; + } + + @Override + public FileStatus[] listStatus(Path f, PathFilter filter) + throws FileNotFoundException, IOException { + return this.listStatus(f); + } + + @Override + public BlockLocation[] getFileBlockLocations(Path p, long start, long len) + throws IOException { + return new BlockLocation[] { + new BlockLocation(new String[] { "localhost:50010" }, + new String[] { "localhost" }, 0, len) }; + } + + @Override + protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f, + PathFilter filter) throws FileNotFoundException, IOException { + ++numListLocatedStatusCalls; + return super.listLocatedStatus(f, filter); + } + } +} Propchange: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -24,11 +24,14 @@ import java.util.List; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.junit.Test; @@ -77,6 +80,23 @@ public class TestFileInputFormat { .toString()); } + @Test + public void testListLocatedStatus() throws Exception { + Configuration conf = getConfiguration(); + conf.setBoolean("fs.test.impl.disable.cache", false); + conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2"); + MockFileSystem mockFs = + (MockFileSystem) new Path("test:///").getFileSystem(conf); + Assert.assertEquals("listLocatedStatus already called", + 0, mockFs.numListLocatedStatusCalls); + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); + List<InputSplit> splits = fileInputFormat.getSplits(job); + Assert.assertEquals("Input splits are not correct", 2, splits.size()); + Assert.assertEquals("listLocatedStatuss calls", + 1, mockFs.numListLocatedStatusCalls); + } + private Configuration getConfiguration() { Configuration conf = new Configuration(); conf.set("fs.test.impl.disable.cache", "true"); @@ -86,13 +106,14 @@ public class TestFileInputFormat { } static class MockFileSystem extends RawLocalFileSystem { + int numListLocatedStatusCalls = 0; @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { if (f.toString().equals("test:/a1")) { return new FileStatus[] { - new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")), + new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")), new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) }; } else if (f.toString().equals("test:/a1/a2")) { return new FileStatus[] { @@ -116,5 +137,20 @@ public class TestFileInputFormat { throws FileNotFoundException, IOException { return this.listStatus(f); } + + @Override + public BlockLocation[] getFileBlockLocations(Path p, long start, long len) + throws IOException { + return new BlockLocation[] { + new BlockLocation(new String[] { "localhost:50010" }, + new String[] { "localhost" }, 0, len) }; + } + + @Override + protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f, + PathFilter filter) throws FileNotFoundException, IOException { + ++numListLocatedStatusCalls; + return super.listLocatedStatus(f, filter); + } } }