Author: vinodkv Date: Thu Mar 20 02:49:31 2014 New Revision: 1579517 URL: http://svn.apache.org/r1579517 Log: MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block location calls in parallel. Contributed by Siddharth Seth. svn merge --ignore-ancestry -c 1579515 ../../trunk/
Added: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java - copied unchanged from r1579515, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt?rev=1579517&r1=1579516&r2=1579517&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt Thu Mar 20 02:49:31 2014 @@ -32,6 +32,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-4052. Improved MapReduce clients to use NodeManagers' ability to handle cross platform application submissions. (Jian He via vinodkv) + MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block + location calls in parallel. (Siddharth Seth via vinodkv) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1579517&r1=1579516&r2=1579517&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Mar 20 02:49:31 2014 @@ -47,6 +47,9 @@ import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; + /** * A base class for file-based {@link InputFormat}. * @@ -203,10 +206,7 @@ public abstract class FileInputFormat<K, // Whether we need to recursive look into the directory structure boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); - - List<FileStatus> result = new ArrayList<FileStatus>(); - List<IOException> errors = new ArrayList<IOException>(); - + // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); @@ -217,6 +217,41 @@ public abstract class FileInputFormat<K, } PathFilter inputFilter = new MultiPathFilter(filters); + FileStatus[] result; + int numThreads = job + .getInt( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); + + Stopwatch sw = new Stopwatch().start(); + if (numThreads == 1) { + List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); + result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); + } else { + Iterable<FileStatus> locatedFiles = null; + try { + + LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( + job, dirs, recursive, inputFilter, false); + locatedFiles = locatedFileStatusFetcher.getFileStatuses(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while getting file statuses"); + } + result = Iterables.toArray(locatedFiles, FileStatus.class); + } + + sw.stop(); + if (LOG.isDebugEnabled()) { + LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); + } + LOG.info("Total input paths to process : " + result.length); + return result; + } + + private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs, + PathFilter inputFilter, boolean recursive) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + List<IOException> errors = new ArrayList<IOException>(); for (Path p: dirs) { FileSystem fs = p.getFileSystem(job); FileStatus[] matches = fs.globStatus(p, inputFilter); @@ -246,12 +281,10 @@ public abstract class FileInputFormat<K, } } } - if (!errors.isEmpty()) { throw new InvalidInputException(errors); } - LOG.info("Total input paths to process : " + result.size()); - return result.toArray(new FileStatus[result.size()]); + return result; } /** @@ -267,6 +300,7 @@ public abstract class FileInputFormat<K, * they're too big.*/ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + Stopwatch sw = new Stopwatch().start(); FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen @@ -325,7 +359,11 @@ public abstract class FileInputFormat<K, splits.add(makeSplit(path, 0, length, new String[0])); } } - LOG.debug("Total # of splits: " + splits.size()); + sw.stop(); + if (LOG.isDebugEnabled()) { + LOG.debug("Total # of splits generated by getSplits: " + splits.size() + + ", TimeTaken: " + sw.elapsedMillis()); + } return splits.toArray(new FileSplit[splits.size()]); } Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1579517&r1=1579516&r2=1579517&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Thu Mar 20 02:49:31 2014 @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.LocatedFileStatusFetcher; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -43,6 +44,9 @@ import org.apache.hadoop.mapreduce.secur import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; + /** * A base class for file-based {@link InputFormat}s. * @@ -68,6 +72,9 @@ public abstract class FileInputFormat<K, "mapreduce.input.fileinputformat.numinputfiles"; public static final String INPUT_DIR_RECURSIVE = "mapreduce.input.fileinputformat.input.dir.recursive"; + public static final String LIST_STATUS_NUM_THREADS = + "mapreduce.input.fileinputformat.list-status.num-threads"; + public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; private static final Log LOG = LogFactory.getLog(FileInputFormat.class); @@ -225,7 +232,6 @@ public abstract class FileInputFormat<K, */ protected List<FileStatus> listStatus(JobContext job ) throws IOException { - List<FileStatus> result = new ArrayList<FileStatus>(); Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); @@ -237,9 +243,7 @@ public abstract class FileInputFormat<K, // Whether we need to recursive look into the directory structure boolean recursive = getInputDirRecursive(job); - - List<IOException> errors = new ArrayList<IOException>(); - + // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); @@ -250,6 +254,37 @@ public abstract class FileInputFormat<K, } PathFilter inputFilter = new MultiPathFilter(filters); + List<FileStatus> result = null; + + int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, + DEFAULT_LIST_STATUS_NUM_THREADS); + Stopwatch sw = new Stopwatch().start(); + if (numThreads == 1) { + result = singleThreadedListStatus(job, dirs, inputFilter, recursive); + } else { + Iterable<FileStatus> locatedFiles = null; + try { + LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( + job.getConfiguration(), dirs, recursive, inputFilter, true); + locatedFiles = locatedFileStatusFetcher.getFileStatuses(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while getting file statuses"); + } + result = Lists.newArrayList(locatedFiles); + } + + sw.stop(); + if (LOG.isDebugEnabled()) { + LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); + } + LOG.info("Total input paths to process : " + result.size()); + return result; + } + + private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, + PathFilter inputFilter, boolean recursive) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + List<IOException> errors = new ArrayList<IOException>(); for (int i=0; i < dirs.length; ++i) { Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); @@ -284,7 +319,6 @@ public abstract class FileInputFormat<K, if (!errors.isEmpty()) { throw new InvalidInputException(errors); } - LOG.info("Total input paths to process : " + result.size()); return result; } @@ -332,6 +366,7 @@ public abstract class FileInputFormat<K, * @throws IOException */ public List<InputSplit> getSplits(JobContext job) throws IOException { + Stopwatch sw = new Stopwatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); @@ -376,7 +411,11 @@ public abstract class FileInputFormat<K, } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); - LOG.debug("Total # of splits: " + splits.size()); + sw.stop(); + if (LOG.isDebugEnabled()) { + LOG.debug("Total # of splits generated by getSplits: " + splits.size() + + ", TimeTaken: " + sw.elapsedMillis()); + } return splits; } Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1579517&r1=1579516&r2=1579517&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Mar 20 02:49:31 2014 @@ -695,6 +695,15 @@ </property> <property> + <name>mapreduce.input.fileinputformat.list-status.num-threads</name> + <value>1</value> + <description>The number of threads to use to list and fetch block locations + for the specified input paths. Note: multiple threads should not be used + if a custom non thread-safe path filter is used. + </description> +</property> + +<property> <name>mapreduce.jobtracker.maxtasks.perjob</name> <value>-1</value> <description>The maximum number of tasks for a single job. Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1579517&r1=1579516&r2=1579517&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Thu Mar 20 02:49:31 2014 @@ -19,7 +19,12 @@ package org.apache.hadoop.mapred; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -29,15 +34,58 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; -public class TestFileInputFormat { +import com.google.common.collect.Lists; +@RunWith(value = Parameterized.class) +public class TestFileInputFormat { + + private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class); + + private static String testTmpDir = System.getProperty("test.build.data", "/tmp"); + private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF"); + + private static FileSystem localFs; + + private int numThreads; + + public TestFileInputFormat(int numThreads) { + this.numThreads = numThreads; + LOG.info("Running with numThreads: " + numThreads); + } + + @Parameters + public static Collection<Object[]> data() { + Object[][] data = new Object[][] { { 1 }, { 5 }}; + return Arrays.asList(data); + } + + @Before + public void setup() throws IOException { + LOG.info("Using Test Dir: " + TEST_ROOT_DIR); + localFs = FileSystem.getLocal(new Configuration()); + localFs.delete(TEST_ROOT_DIR, true); + localFs.mkdirs(TEST_ROOT_DIR); + } + + @After + public void cleanup() throws IOException { + localFs.delete(TEST_ROOT_DIR, true); + } + @Test public void testListLocatedStatus() throws Exception { Configuration conf = getConfiguration(); conf.setBoolean("fs.test.impl.disable.cache", false); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1/a2"); MockFileSystem mockFs = @@ -51,6 +99,82 @@ public class TestFileInputFormat { Assert.assertEquals("Input splits are not correct", 2, splits.length); Assert.assertEquals("listLocatedStatuss calls", 1, mockFs.numListLocatedStatusCalls); + FileSystem.closeAll(); + } + + @Test + public void testListStatusSimple() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .configureTestSimple(conf, localFs); + + JobConf jobConf = new JobConf(conf); + TextInputFormat fif = new TextInputFormat(); + fif.configure(jobConf); + FileStatus[] statuses = fif.listStatus(jobConf); + + org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses), + localFs); + } + + @Test + public void testListStatusNestedRecursive() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .configureTestNestedRecursive(conf, localFs); + JobConf jobConf = new JobConf(conf); + TextInputFormat fif = new TextInputFormat(); + fif.configure(jobConf); + FileStatus[] statuses = fif.listStatus(jobConf); + + org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses), + localFs); + } + + @Test + public void testListStatusNestedNonRecursive() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .configureTestNestedNonRecursive(conf, localFs); + JobConf jobConf = new JobConf(conf); + TextInputFormat fif = new TextInputFormat(); + fif.configure(jobConf); + FileStatus[] statuses = fif.listStatus(jobConf); + + org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses), + localFs); + } + + @Test + public void testListStatusErrorOnNonExistantDir() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat + .configureTestErrorOnNonExistantDir(conf, localFs); + JobConf jobConf = new JobConf(conf); + TextInputFormat fif = new TextInputFormat(); + fif.configure(jobConf); + try { + fif.listStatus(jobConf); + Assert.fail("Expecting an IOException for a missing Input path"); + } catch (IOException e) { + Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2"); + expectedExceptionPath = localFs.makeQualified(expectedExceptionPath); + Assert.assertTrue(e instanceof InvalidInputException); + Assert.assertEquals( + "Input path does not exist: " + expectedExceptionPath.toString(), + e.getMessage()); + } } private Configuration getConfiguration() { Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1579517&r1=1579516&r2=1579517&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Thu Mar 20 02:49:31 2014 @@ -19,10 +19,17 @@ package org.apache.hadoop.mapreduce.lib. import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -34,55 +41,90 @@ import org.apache.hadoop.fs.RawLocalFile import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +@RunWith(value = Parameterized.class) public class TestFileInputFormat { + + private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class); + + private static String testTmpDir = System.getProperty("test.build.data", "/tmp"); + private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF"); + + private static FileSystem localFs; + + private int numThreads; + + public TestFileInputFormat(int numThreads) { + this.numThreads = numThreads; + LOG.info("Running with numThreads: " + numThreads); + } + + @Parameters + public static Collection<Object[]> data() { + Object[][] data = new Object[][] { { 1 }, { 5 }}; + return Arrays.asList(data); + } + + @Before + public void setup() throws IOException { + LOG.info("Using Test Dir: " + TEST_ROOT_DIR); + localFs = FileSystem.getLocal(new Configuration()); + localFs.delete(TEST_ROOT_DIR, true); + localFs.mkdirs(TEST_ROOT_DIR); + } + + @After + public void cleanup() throws IOException { + localFs.delete(TEST_ROOT_DIR, true); + } @Test public void testNumInputFilesRecursively() throws Exception { Configuration conf = getConfiguration(); conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true"); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); Job job = Job.getInstance(conf); FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); List<InputSplit> splits = fileInputFormat.getSplits(job); Assert.assertEquals("Input splits are not correct", 3, splits.size()); - Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0)) - .getPath().toString()); - Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1)) - .getPath().toString()); - Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath() - .toString()); - + verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3", + "test:/a1/file1"), splits); + // Using the deprecated configuration conf = getConfiguration(); conf.set("mapred.input.dir.recursive", "true"); job = Job.getInstance(conf); splits = fileInputFormat.getSplits(job); - Assert.assertEquals("Input splits are not correct", 3, splits.size()); - Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0)) - .getPath().toString()); - Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1)) - .getPath().toString()); - Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath() - .toString()); + verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3", + "test:/a1/file1"), splits); } @Test public void testNumInputFilesWithoutRecursively() throws Exception { Configuration conf = getConfiguration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); Job job = Job.getInstance(conf); FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); List<InputSplit> splits = fileInputFormat.getSplits(job); Assert.assertEquals("Input splits are not correct", 2, splits.size()); - Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath() - .toString()); - Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath() - .toString()); + verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits); } @Test public void testListLocatedStatus() throws Exception { Configuration conf = getConfiguration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); conf.setBoolean("fs.test.impl.disable.cache", false); conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2"); MockFileSystem mockFs = @@ -95,8 +137,226 @@ public class TestFileInputFormat { Assert.assertEquals("Input splits are not correct", 2, splits.size()); Assert.assertEquals("listLocatedStatuss calls", 1, mockFs.numListLocatedStatusCalls); + FileSystem.closeAll(); + } + + @Test + public void testListStatusSimple() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = configureTestSimple(conf, localFs); + + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fif = new TextInputFormat(); + List<FileStatus> statuses = fif.listStatus(job); + + verifyFileStatuses(expectedPaths, statuses, localFs); + } + + @Test + public void testListStatusNestedRecursive() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = configureTestNestedRecursive(conf, localFs); + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fif = new TextInputFormat(); + List<FileStatus> statuses = fif.listStatus(job); + + verifyFileStatuses(expectedPaths, statuses, localFs); + } + + + @Test + public void testListStatusNestedNonRecursive() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + List<Path> expectedPaths = configureTestNestedNonRecursive(conf, localFs); + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fif = new TextInputFormat(); + List<FileStatus> statuses = fif.listStatus(job); + + verifyFileStatuses(expectedPaths, statuses, localFs); + } + + @Test + public void testListStatusErrorOnNonExistantDir() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); + + configureTestErrorOnNonExistantDir(conf, localFs); + Job job = Job.getInstance(conf); + FileInputFormat<?, ?> fif = new TextInputFormat(); + try { + fif.listStatus(job); + Assert.fail("Expecting an IOException for a missing Input path"); + } catch (IOException e) { + Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2"); + expectedExceptionPath = localFs.makeQualified(expectedExceptionPath); + Assert.assertTrue(e instanceof InvalidInputException); + Assert.assertEquals( + "Input path does not exist: " + expectedExceptionPath.toString(), + e.getMessage()); + } + } + + public static List<Path> configureTestSimple(Configuration conf, FileSystem localFs) + throws IOException { + Path base1 = new Path(TEST_ROOT_DIR, "input1"); + Path base2 = new Path(TEST_ROOT_DIR, "input2"); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + localFs.makeQualified(base1) + "," + localFs.makeQualified(base2)); + localFs.mkdirs(base1); + localFs.mkdirs(base2); + + Path in1File1 = new Path(base1, "file1"); + Path in1File2 = new Path(base1, "file2"); + localFs.createNewFile(in1File1); + localFs.createNewFile(in1File2); + + Path in2File1 = new Path(base2, "file1"); + Path in2File2 = new Path(base2, "file2"); + localFs.createNewFile(in2File1); + localFs.createNewFile(in2File2); + List<Path> expectedPaths = Lists.newArrayList(in1File1, in1File2, in2File1, + in2File2); + return expectedPaths; + } + + public static List<Path> configureTestNestedRecursive(Configuration conf, + FileSystem localFs) throws IOException { + Path base1 = new Path(TEST_ROOT_DIR, "input1"); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + localFs.makeQualified(base1).toString()); + conf.setBoolean( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, + true); + localFs.mkdirs(base1); + + Path inDir1 = new Path(base1, "dir1"); + Path inDir2 = new Path(base1, "dir2"); + Path inFile1 = new Path(base1, "file1"); + + Path dir1File1 = new Path(inDir1, "file1"); + Path dir1File2 = new Path(inDir1, "file2"); + + Path dir2File1 = new Path(inDir2, "file1"); + Path dir2File2 = new Path(inDir2, "file2"); + + localFs.mkdirs(inDir1); + localFs.mkdirs(inDir2); + + localFs.createNewFile(inFile1); + localFs.createNewFile(dir1File1); + localFs.createNewFile(dir1File2); + localFs.createNewFile(dir2File1); + localFs.createNewFile(dir2File2); + + List<Path> expectedPaths = Lists.newArrayList(inFile1, dir1File1, + dir1File2, dir2File1, dir2File2); + return expectedPaths; + } + + public static List<Path> configureTestNestedNonRecursive(Configuration conf, + FileSystem localFs) throws IOException { + Path base1 = new Path(TEST_ROOT_DIR, "input1"); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + localFs.makeQualified(base1).toString()); + conf.setBoolean( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, + false); + localFs.mkdirs(base1); + + Path inDir1 = new Path(base1, "dir1"); + Path inDir2 = new Path(base1, "dir2"); + Path inFile1 = new Path(base1, "file1"); + + Path dir1File1 = new Path(inDir1, "file1"); + Path dir1File2 = new Path(inDir1, "file2"); + + Path dir2File1 = new Path(inDir2, "file1"); + Path dir2File2 = new Path(inDir2, "file2"); + + localFs.mkdirs(inDir1); + localFs.mkdirs(inDir2); + + localFs.createNewFile(inFile1); + localFs.createNewFile(dir1File1); + localFs.createNewFile(dir1File2); + localFs.createNewFile(dir2File1); + localFs.createNewFile(dir2File2); + + List<Path> expectedPaths = Lists.newArrayList(inFile1, inDir1, inDir2); + return expectedPaths; } + public static List<Path> configureTestErrorOnNonExistantDir(Configuration conf, + FileSystem localFs) throws IOException { + Path base1 = new Path(TEST_ROOT_DIR, "input1"); + Path base2 = new Path(TEST_ROOT_DIR, "input2"); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + localFs.makeQualified(base1) + "," + localFs.makeQualified(base2)); + conf.setBoolean( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, + true); + localFs.mkdirs(base1); + + Path inFile1 = new Path(base1, "file1"); + Path inFile2 = new Path(base1, "file2"); + + localFs.createNewFile(inFile1); + localFs.createNewFile(inFile2); + + List<Path> expectedPaths = Lists.newArrayList(); + return expectedPaths; + } + + public static void verifyFileStatuses(List<Path> expectedPaths, + List<FileStatus> fetchedStatuses, final FileSystem localFs) { + Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size()); + + Iterable<Path> fqExpectedPaths = Iterables.transform(expectedPaths, + new Function<Path, Path>() { + @Override + public Path apply(Path input) { + return localFs.makeQualified(input); + } + }); + + Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths); + for (FileStatus fileStatus : fetchedStatuses) { + if (!expectedPathSet.remove(localFs.makeQualified(fileStatus.getPath()))) { + Assert.fail("Found extra fetched status: " + fileStatus.getPath()); + } + } + Assert.assertEquals( + "Not all expectedPaths matched: " + expectedPathSet.toString(), 0, + expectedPathSet.size()); + } + + + private void verifySplits(List<String> expected, List<InputSplit> splits) { + Iterable<String> pathsFromSplits = Iterables.transform(splits, + new Function<InputSplit, String>() { + @Override + public String apply(@Nullable InputSplit input) { + return ((FileSplit) input).getPath().toString(); + } + }); + + Set<String> expectedSet = Sets.newHashSet(expected); + for (String splitPathString : pathsFromSplits) { + if (!expectedSet.remove(splitPathString)) { + Assert.fail("Found extra split: " + splitPathString); + } + } + Assert.assertEquals( + "Not all expectedPaths matched: " + expectedSet.toString(), 0, + expectedSet.size()); + } + private Configuration getConfiguration() { Configuration conf = new Configuration(); conf.set("fs.test.impl.disable.cache", "true");