Author: bikas
Date: Fri Apr 19 20:39:30 2013
New Revision: 1470030

URL: http://svn.apache.org/r1470030
Log:
MAPREDUCE-5161. Merge MAPREDUCE-1806 from branch-1 to branch-1-win. 
CombineFileInputFormat fix for paths not on default FS (Chris Nauroth via bikas)

Modified:
    hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
    
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
    
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
    
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1470030&r1=1470029&r2=1470030&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Fri Apr 19 
20:39:30 2013
@@ -368,3 +368,7 @@ Branch-hadoop-1-win (branched from branc
 
     HADOOP-9450. HADOOP_USER_CLASSPATH_FIRST is not honored; CLASSPATH
     is PREpended instead of APpended. (Chris Nauroth and harsh via harsh)
+
+    MAPREDUCE-5161. Merge MAPREDUCE-1806 from branch-1 to branch-1-win.
+    CombineFileInputFormat fix for paths not on default FS (Chris Nauroth via
+    bikas)

Modified: 
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1470030&r1=1470029&r2=1470030&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
 (original)
+++ 
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
 Fri Apr 19 20:39:30 2013
@@ -194,7 +194,7 @@ public abstract class CombineFileInputFo
           continue;
         }
         FileSystem fs = paths[i].getFileSystem(job);
-        Path p = new Path(paths[i].toUri().getPath());
+        Path p = fs.makeQualified(paths[i]);
         if (onepool.accept(p)) {
           myPaths.add(paths[i]); // add it to my output set
           paths[i] = null;       // already processed

Modified: 
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1470030&r1=1470029&r2=1470030&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
 (original)
+++ 
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
 Fri Apr 19 20:39:30 2013
@@ -211,7 +211,8 @@ public abstract class CombineFileInputFo
     // times, one time each for each pool in the next loop.
     List<Path> newpaths = new LinkedList<Path>();
     for (int i = 0; i < paths.length; i++) {
-      Path p = new Path(paths[i].toUri().getPath());
+      FileSystem fs = paths[i].getFileSystem(conf);
+      Path p = fs.makeQualified(paths[i]);
       newpaths.add(p);
     }
     paths = null;

Modified: 
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1470030&r1=1470029&r2=1470030&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
 (original)
+++ 
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
 Fri Apr 19 20:39:30 2013
@@ -23,6 +23,7 @@ import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
@@ -67,6 +69,7 @@ public class TestCombineFileInputFormat 
 
   static final int BLOCKSIZE = 1024;
   static final byte[] databuf = new byte[BLOCKSIZE];
+  private static final String DUMMY_FS_URI = "dummyfs:///";
 
   private static final Log LOG = 
LogFactory.getLog(TestCombineFileInputFormat.class);
   
@@ -450,6 +453,39 @@ public class TestCombineFileInputFormat 
     stm.close();
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
+  
+  /**
+   * Test when input files are from non-default file systems
+   */
+  public void testForNonDefaultFileSystem() throws Throwable {
+    Configuration conf = new Configuration();
+
+    // use a fake file system scheme as default
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
+
+    // default fs path
+    assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
+    // add a local file
+    Path localPath = new Path("testFile1");
+    FileSystem lfs = FileSystem.getLocal(conf);
+    FSDataOutputStream dos = lfs.create(localPath);
+    dos.writeChars("Local file for CFIF");
+    dos.close();
+
+    conf.set("mapred.working.dir", "/");
+    JobConf job = new JobConf(conf);
+
+    FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
+    DummyInputFormat inFormat = new DummyInputFormat();
+    InputSplit[] splits = inFormat.getSplits(job, 1);
+    assertTrue(splits.length > 0);
+    for (InputSplit s : splits) {
+      CombineFileSplit cfs = (CombineFileSplit)s;
+      for (Path p : cfs.getPaths()) {
+        assertEquals(p.toUri().getScheme(), "file");
+      }
+    }
+  }
 
   static class TestFilter implements PathFilter {
     private Path p;
@@ -462,7 +498,7 @@ public class TestCombineFileInputFormat 
     // returns true if the specified path matches the prefix stored
     // in this TestFilter.
     public boolean accept(Path path) {
-      if (path.toString().indexOf(p.toString()) == 0) {
+      if (path.toUri().getPath().indexOf(p.toString()) == 0) {
         return true;
       }
       return false;

Modified: 
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1470030&r1=1470029&r2=1470030&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
 (original)
+++ 
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
 Fri Apr 19 20:39:30 2013
@@ -72,7 +72,8 @@ public class TestCombineFileInputFormat 
 
   static final int BLOCKSIZE = 1024;
   static final byte[] databuf = new byte[BLOCKSIZE];
-
+  private static final String DUMMY_FS_URI = "dummyfs:///";
+  
   /** Dummy class to extend CombineFileInputFormat*/
   private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
     @Override
@@ -1110,6 +1111,37 @@ public class TestCombineFileInputFormat 
       }
     }
   }
+  
+  /**
+   * Test when input files are from non-default file systems
+   */
+  public void testForNonDefaultFileSystem() throws Throwable {
+    Configuration conf = new Configuration();
+
+    // use a fake file system scheme as default
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
+
+    // default fs path
+    assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
+    // add a local file
+    Path localPath = new Path("testFile1");
+    FileSystem lfs = FileSystem.getLocal(conf);
+    FSDataOutputStream dos = lfs.create(localPath);
+    dos.writeChars("Local file for CFIF");
+    dos.close();
+
+    Job job = new Job(conf);
+    FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
+    DummyInputFormat inFormat = new DummyInputFormat();
+    List<InputSplit> splits = inFormat.getSplits(job);
+    assertTrue(splits.size() > 0);
+    for (InputSplit s : splits) {
+      CombineFileSplit cfs = (CombineFileSplit)s;
+      for (Path p : cfs.getPaths()) {
+        assertEquals(p.toUri().getScheme(), "file");
+      }
+    }
+  }
 
   static class TestFilter implements PathFilter {
     private Path p;
@@ -1122,7 +1154,7 @@ public class TestCombineFileInputFormat 
     // returns true if the specified path matches the prefix stored
     // in this TestFilter.
     public boolean accept(Path path) {
-      if (path.toString().indexOf(p.toString()) == 0) {
+      if (path.toUri().getPath().indexOf(p.toString()) == 0) {
         return true;
       }
       return false;


Reply via email to