Author: bikas Date: Fri Apr 19 20:39:30 2013 New Revision: 1470030 URL: http://svn.apache.org/r1470030 Log: MAPREDUCE-5161. Merge MAPREDUCE-1806 from branch-1 to branch-1-win. CombineFileInputFormat fix for paths not on default FS (Chris Nauroth via bikas)
Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1470030&r1=1470029&r2=1470030&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original) +++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Fri Apr 19 20:39:30 2013 @@ -368,3 +368,7 @@ Branch-hadoop-1-win (branched from branc HADOOP-9450. HADOOP_USER_CLASSPATH_FIRST is not honored; CLASSPATH is PREpended instead of APpended. (Chris Nauroth and harsh via harsh) + + MAPREDUCE-5161. Merge MAPREDUCE-1806 from branch-1 to branch-1-win. + CombineFileInputFormat fix for paths not on default FS (Chris Nauroth via + bikas) Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1470030&r1=1470029&r2=1470030&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Fri Apr 19 20:39:30 2013 @@ -194,7 +194,7 @@ public abstract class CombineFileInputFo continue; } FileSystem fs = paths[i].getFileSystem(job); - Path p = new Path(paths[i].toUri().getPath()); + Path p = fs.makeQualified(paths[i]); if (onepool.accept(p)) { myPaths.add(paths[i]); // add it to my output set paths[i] = null; // already processed Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1470030&r1=1470029&r2=1470030&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri Apr 19 20:39:30 2013 @@ -211,7 +211,8 @@ public abstract class CombineFileInputFo // times, one time each for each pool in the next loop. List<Path> newpaths = new LinkedList<Path>(); for (int i = 0; i < paths.length; i++) { - Path p = new Path(paths[i].toUri().getPath()); + FileSystem fs = paths[i].getFileSystem(conf); + Path p = fs.makeQualified(paths[i]); newpaths.add(p); } paths = null; Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1470030&r1=1470029&r2=1470030&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Fri Apr 19 20:39:30 2013 @@ -23,6 +23,7 @@ import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +33,7 @@ import org.apache.hadoop.hdfs.DFSTestUti import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; @@ -67,6 +69,7 @@ public class TestCombineFileInputFormat static final int BLOCKSIZE = 1024; static final byte[] databuf = new byte[BLOCKSIZE]; + private static final String DUMMY_FS_URI = "dummyfs:///"; private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class); @@ -450,6 +453,39 @@ public class TestCombineFileInputFormat stm.close(); DFSTestUtil.waitReplication(fileSys, name, replication); } + + /** + * Test when input files are from non-default file systems + */ + public void testForNonDefaultFileSystem() throws Throwable { + Configuration conf = new Configuration(); + + // use a fake file system scheme as default + conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI); + + // default fs path + assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString()); + // add a local file + Path localPath = new Path("testFile1"); + FileSystem lfs = FileSystem.getLocal(conf); + FSDataOutputStream dos = lfs.create(localPath); + dos.writeChars("Local file for CFIF"); + dos.close(); + + conf.set("mapred.working.dir", "/"); + JobConf job = new JobConf(conf); + + FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath)); + DummyInputFormat inFormat = new DummyInputFormat(); + InputSplit[] splits = inFormat.getSplits(job, 1); + assertTrue(splits.length > 0); + for (InputSplit s : splits) { + CombineFileSplit cfs = (CombineFileSplit)s; + for (Path p : cfs.getPaths()) { + assertEquals(p.toUri().getScheme(), "file"); + } + } + } static class TestFilter implements PathFilter { private Path p; @@ -462,7 +498,7 @@ public class TestCombineFileInputFormat // returns true if the specified path matches the prefix stored // in this TestFilter. public boolean accept(Path path) { - if (path.toString().indexOf(p.toString()) == 0) { + if (path.toUri().getPath().indexOf(p.toString()) == 0) { return true; } return false; Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1470030&r1=1470029&r2=1470030&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Fri Apr 19 20:39:30 2013 @@ -72,7 +72,8 @@ public class TestCombineFileInputFormat static final int BLOCKSIZE = 1024; static final byte[] databuf = new byte[BLOCKSIZE]; - + private static final String DUMMY_FS_URI = "dummyfs:///"; + /** Dummy class to extend CombineFileInputFormat*/ private class DummyInputFormat extends CombineFileInputFormat<Text, Text> { @Override @@ -1110,6 +1111,37 @@ public class TestCombineFileInputFormat } } } + + /** + * Test when input files are from non-default file systems + */ + public void testForNonDefaultFileSystem() throws Throwable { + Configuration conf = new Configuration(); + + // use a fake file system scheme as default + conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI); + + // default fs path + assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString()); + // add a local file + Path localPath = new Path("testFile1"); + FileSystem lfs = FileSystem.getLocal(conf); + FSDataOutputStream dos = lfs.create(localPath); + dos.writeChars("Local file for CFIF"); + dos.close(); + + Job job = new Job(conf); + FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath)); + DummyInputFormat inFormat = new DummyInputFormat(); + List<InputSplit> splits = inFormat.getSplits(job); + assertTrue(splits.size() > 0); + for (InputSplit s : splits) { + CombineFileSplit cfs = (CombineFileSplit)s; + for (Path p : cfs.getPaths()) { + assertEquals(p.toUri().getScheme(), "file"); + } + } + } static class TestFilter implements PathFilter { private Path p; @@ -1122,7 +1154,7 @@ public class TestCombineFileInputFormat // returns true if the specified path matches the prefix stored // in this TestFilter. public boolean accept(Path path) { - if (path.toString().indexOf(p.toString()) == 0) { + if (path.toUri().getPath().indexOf(p.toString()) == 0) { return true; } return false;