Author: rding
Date: Sat Mar 20 00:44:13 2010
New Revision: 925504

URL: http://svn.apache.org/viewvc?rev=925504&view=rev
Log:
PIG-1298: Restore file traversal behavior to Pig loaders

Added:
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sat Mar 20 00:44:13 2010
@@ -159,6 +159,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-1298: Restore file traversal behavior to Pig loaders (rding)
+
 PIG-1289: PIG Join fails while doing a filter on joined data (daijy)
 
 PIG-1266: Show spill count on the pig console at the end of the job (sriranjan

Added: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java?rev=925504&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java
 Sat Mar 20 00:44:13 2010
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+public abstract class PigFileInputFormat <K, V> extends FileInputFormat<K, V> {
+    
+    /*
+     * This is to support multi-level/recursive directory listing until 
+     * MAPREDUCE-1577 is fixed.
+     */
+    @Override
+    protected List<FileStatus> listStatus(JobContext job) throws IOException { 
              
+        return MapRedUtil.getAllFileRecursively(super.listStatus(job), 
+                job.getConfiguration());        
+    }
+    
+}

Added: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java?rev=925504&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java
 Sat Mar 20 00:44:13 2010
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+public class PigSequenceFileInputFormat<K, V> extends 
SequenceFileInputFormat<K, V> {
+
+    /*
+     * This is to support multi-level/recursive directory listing until 
+     * MAPREDUCE-1577 is fixed.
+     */
+    @Override
+    protected List<FileStatus> listStatus(JobContext job) throws IOException { 
       
+        Path[] dirs = FileInputFormat.getInputPaths(job);
+        if (dirs.length == 0) {
+            throw new IOException("No input paths specified in job");
+        }
+        List<FileStatus> files = new ArrayList<FileStatus>();
+        for (int i=0; i<dirs.length; ++i) {
+            Path p = dirs[i];
+            FileSystem fs = p.getFileSystem(job.getConfiguration()); 
+            FileStatus[] matches = fs.globStatus(p, hiddenFileFilter);
+            if (matches == null) {
+                throw new IOException("Input path does not exist: " + p);
+            } else if (matches.length == 0) {
+                throw new IOException("Input Pattern " + p + " matches 0 
files");
+            } else {
+                for (FileStatus globStat: matches) {
+                    files.add(globStat);
+                }
+            }
+        }
+        return MapRedUtil.getAllFileRecursively(files, 
job.getConfiguration());        
+    }
+
+    private static final PathFilter hiddenFileFilter = new PathFilter(){
+        public boolean accept(Path p){
+            String name = p.getName(); 
+            return !name.startsWith("_") && !name.startsWith("."); 
+        }
+    };   
+}

Added: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java?rev=925504&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java
 Sat Mar 20 00:44:13 2010
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+public class PigTextInputFormat extends TextInputFormat {
+
+    /*
+     * This is to support multi-level/recursive directory listing until 
+     * MAPREDUCE-1577 is fixed.
+     */
+    @Override
+    protected List<FileStatus> listStatus(JobContext job) throws IOException { 
       
+        return MapRedUtil.getAllFileRecursively(super.listStatus(job), 
+                job.getConfiguration());             
+    }
+    
+}

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 Sat Mar 20 00:44:13 2010
@@ -19,13 +19,19 @@
 package org.apache.pig.backend.hadoop.executionengine.util;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
@@ -116,4 +122,50 @@ public class MapRedUtil {
         udfc.deserialize();
     }
 
+    /**
+     * Get all files recursively from the given list of files
+     * 
+     * @param files a list of FileStatus
+     * @param conf the configuration object
+     * @return the list of fileStatus that contains all the files in the given
+     *         list and, recursively, all the files inside the directories in 
+     *         the given list
+     * @throws IOException
+     */
+    public static List<FileStatus> getAllFileRecursively(
+            List<FileStatus> files, Configuration conf) throws IOException {
+        List<FileStatus> result = new ArrayList<FileStatus>();
+        int len = files.size();
+        for (int i = 0; i < len; ++i) {
+            FileStatus file = files.get(i);
+            if (file.isDir()) {
+                Path p = file.getPath();
+                FileSystem fs = p.getFileSystem(conf);
+                addInputPathRecursively(result, fs, p, hiddenFileFilter);
+            } else {
+                result.add(file);
+            }
+        }
+        log.info("Total input paths to process : " + result.size()); 
+        return result;
+    }
+    
+    private static void addInputPathRecursively(List<FileStatus> result,
+            FileSystem fs, Path path, PathFilter inputFilter) 
+            throws IOException {
+        for (FileStatus stat: fs.listStatus(path, inputFilter)) {
+            if (stat.isDir()) {
+                addInputPathRecursively(result, fs, stat.getPath(), 
inputFilter);
+            } else {
+                result.add(stat);
+            }
+        }
+    }          
+
+    private static final PathFilter hiddenFileFilter = new PathFilter(){
+        public boolean accept(Path p){
+            String name = p.getName(); 
+            return !name.startsWith("_") && !name.startsWith("."); 
+        }
+    };    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Sat Mar 20 
00:44:13 2010
@@ -26,7 +26,6 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
@@ -39,7 +38,6 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.pig.FileInputLoadFunc;
@@ -51,6 +49,7 @@ import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -231,7 +230,7 @@ LoadPushDown {
         if(loadLocation.endsWith("bz2") || loadLocation.endsWith("bz")) {
             return new Bzip2TextInputFormat();
         } else {
-            return new TextInputFormat();
+            return new PigTextInputFormat();
         }
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java Sat 
Mar 20 00:44:13 2010
@@ -23,13 +23,13 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
 import org.apache.pig.data.Tuple;
 
 /**
  *
  */
-public class BinStorageInputFormat extends FileInputFormat<Text, Tuple> {
+public class BinStorageInputFormat extends PigFileInputFormat<Text, Tuple> {
 
     /* (non-Javadoc)
      * @see 
org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit,
 org.apache.hadoop.mapreduce.TaskAttemptContext)

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Sat Mar 20 
00:44:13 2010
@@ -92,9 +92,33 @@ public class TestMergeJoin {
         Util.deleteFile(cluster, INPUT_FILE2);
     }
 
-    /**
-     * Test method for {...@link 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin#getNext(org.apache.pig.data.Tuple)}.
-     */
+    @Test
+    public void testRecursiveFileListing() throws IOException{
+        Util.createInputFile(cluster, "foo/bar/test.dat", new String[]{"2"});
+        pigServer.registerQuery("A = LOAD 'foo';");
+        pigServer.registerQuery("B = LOAD 'foo';");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj 
= BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0 using 
'merge';");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+        Util.deleteFile(cluster,"foo/bar/test.dat");
+    }
+    
     @Test
     public void testMergeJoinSimplest() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Sat Mar 20 
00:44:13 2010
@@ -481,4 +481,34 @@ public class TestSkewedJoin extends Test
         new File(RIGHT_INPUT_FILE).delete();
         Util.deleteFile(cluster, RIGHT_INPUT_FILE);            
     }
+    
+    public void testRecursiveFileListing() throws IOException {
+        String LOCAL_INPUT_FILE = "test.dat";
+        String INPUT_FILE = "foo/bar/test.dat";
+        
+        PrintWriter w = new PrintWriter(new FileWriter(LOCAL_INPUT_FILE));
+        w.println("1");
+        w.println("2");
+        w.println("3");
+        w.println("5");
+        w.close();
+        
+        Util.copyFromLocalToCluster(cluster, LOCAL_INPUT_FILE, INPUT_FILE);
+        
+        pigServer.registerQuery("a = load 'foo' as (nums:chararray);");
+        pigServer.registerQuery("b = load 'foo' as (nums:chararray);");
+        pigServer.registerQuery("d = join a by nums, b by nums USING 
'skewed';");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("d");
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        Assert.assertEquals(4, count);
+        
+        new File(LOCAL_INPUT_FILE).delete();
+        Util.deleteFile(cluster, INPUT_FILE);
+          
+    }
 }


Reply via email to