Author: rding
Date: Sat Mar 20 00:44:13 2010
New Revision: 925504
URL: http://svn.apache.org/viewvc?rev=925504&view=rev
Log:
PIG-1298: Restore file traversal behavior to Pig loaders
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sat Mar 20 00:44:13 2010
@@ -159,6 +159,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-1298: Restore file traversal behavior to Pig loaders (rding)
+
PIG-1289: PIG Join fails while doing a filter on joined data (daijy)
PIG-1266: Show spill count on the pig console at the end of the job (sriranjan
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java?rev=925504&view=auto
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java
(added)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFileInputFormat.java
Sat Mar 20 00:44:13 2010
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+public abstract class PigFileInputFormat <K, V> extends FileInputFormat<K, V> {
+
+ /*
+ * This is to support multi-level/recursive directory listing until
+ * MAPREDUCE-1577 is fixed.
+ */
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ return MapRedUtil.getAllFileRecursively(super.listStatus(job),
+ job.getConfiguration());
+ }
+
+}
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java?rev=925504&view=auto
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java
(added)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSequenceFileInputFormat.java
Sat Mar 20 00:44:13 2010
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+public class PigSequenceFileInputFormat<K, V> extends
SequenceFileInputFormat<K, V> {
+
+ /*
+ * This is to support multi-level/recursive directory listing until
+ * MAPREDUCE-1577 is fixed.
+ */
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ Path[] dirs = FileInputFormat.getInputPaths(job);
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+ List<FileStatus> files = new ArrayList<FileStatus>();
+ for (int i=0; i<dirs.length; ++i) {
+ Path p = dirs[i];
+ FileSystem fs = p.getFileSystem(job.getConfiguration());
+ FileStatus[] matches = fs.globStatus(p, hiddenFileFilter);
+ if (matches == null) {
+ throw new IOException("Input path does not exist: " + p);
+ } else if (matches.length == 0) {
+ throw new IOException("Input Pattern " + p + " matches 0
files");
+ } else {
+ for (FileStatus globStat: matches) {
+ files.add(globStat);
+ }
+ }
+ }
+ return MapRedUtil.getAllFileRecursively(files,
job.getConfiguration());
+ }
+
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+}
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java?rev=925504&view=auto
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java
(added)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextInputFormat.java
Sat Mar 20 00:44:13 2010
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+public class PigTextInputFormat extends TextInputFormat {
+
+ /*
+ * This is to support multi-level/recursive directory listing until
+ * MAPREDUCE-1577 is fixed.
+ */
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ return MapRedUtil.getAllFileRecursively(super.listStatus(job),
+ job.getConfiguration());
+ }
+
+}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
Sat Mar 20 00:44:13 2010
@@ -19,13 +19,19 @@
package org.apache.pig.backend.hadoop.executionengine.util;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
@@ -116,4 +122,50 @@ public class MapRedUtil {
udfc.deserialize();
}
+ /**
+ * Get all files recursively from the given list of files
+ *
+ * @param files a list of FileStatus
+ * @param conf the configuration object
+ * @return the list of fileStatus that contains all the files in the given
+ * list and, recursively, all the files inside the directories in
+ * the given list
+ * @throws IOException
+ */
+ public static List<FileStatus> getAllFileRecursively(
+ List<FileStatus> files, Configuration conf) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ int len = files.size();
+ for (int i = 0; i < len; ++i) {
+ FileStatus file = files.get(i);
+ if (file.isDir()) {
+ Path p = file.getPath();
+ FileSystem fs = p.getFileSystem(conf);
+ addInputPathRecursively(result, fs, p, hiddenFileFilter);
+ } else {
+ result.add(file);
+ }
+ }
+ log.info("Total input paths to process : " + result.size());
+ return result;
+ }
+
+ private static void addInputPathRecursively(List<FileStatus> result,
+ FileSystem fs, Path path, PathFilter inputFilter)
+ throws IOException {
+ for (FileStatus stat: fs.listStatus(path, inputFilter)) {
+ if (stat.isDir()) {
+ addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
+ } else {
+ result.add(stat);
+ }
+ }
+ }
+
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Sat Mar 20
00:44:13 2010
@@ -26,7 +26,6 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
@@ -39,7 +38,6 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.FileInputLoadFunc;
@@ -51,6 +49,7 @@ import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -231,7 +230,7 @@ LoadPushDown {
if(loadLocation.endsWith("bz2") || loadLocation.endsWith("bz")) {
return new Bzip2TextInputFormat();
} else {
- return new TextInputFormat();
+ return new PigTextInputFormat();
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageInputFormat.java Sat
Mar 20 00:44:13 2010
@@ -23,13 +23,13 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.data.Tuple;
/**
*
*/
-public class BinStorageInputFormat extends FileInputFormat<Text, Tuple> {
+public class BinStorageInputFormat extends PigFileInputFormat<Text, Tuple> {
/* (non-Javadoc)
* @see
org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit,
org.apache.hadoop.mapreduce.TaskAttemptContext)
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Sat Mar 20
00:44:13 2010
@@ -92,9 +92,33 @@ public class TestMergeJoin {
Util.deleteFile(cluster, INPUT_FILE2);
}
- /**
- * Test method for {...@link
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin#getNext(org.apache.pig.data.Tuple)}.
- */
+ @Test
+ public void testRecursiveFileListing() throws IOException{
+ Util.createInputFile(cluster, "foo/bar/test.dat", new String[]{"2"});
+ pigServer.registerQuery("A = LOAD 'foo';");
+ pigServer.registerQuery("B = LOAD 'foo';");
+ DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj
= BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0 using
'merge';");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbMergeJoin.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+ Util.deleteFile(cluster,"foo/bar/test.dat");
+ }
+
@Test
public void testMergeJoinSimplest() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=925504&r1=925503&r2=925504&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Sat Mar 20
00:44:13 2010
@@ -481,4 +481,34 @@ public class TestSkewedJoin extends Test
new File(RIGHT_INPUT_FILE).delete();
Util.deleteFile(cluster, RIGHT_INPUT_FILE);
}
+
+ public void testRecursiveFileListing() throws IOException {
+ String LOCAL_INPUT_FILE = "test.dat";
+ String INPUT_FILE = "foo/bar/test.dat";
+
+ PrintWriter w = new PrintWriter(new FileWriter(LOCAL_INPUT_FILE));
+ w.println("1");
+ w.println("2");
+ w.println("3");
+ w.println("5");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, LOCAL_INPUT_FILE, INPUT_FILE);
+
+ pigServer.registerQuery("a = load 'foo' as (nums:chararray);");
+ pigServer.registerQuery("b = load 'foo' as (nums:chararray);");
+ pigServer.registerQuery("d = join a by nums, b by nums USING
'skewed';");
+
+ Iterator<Tuple> iter = pigServer.openIterator("d");
+ int count = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ Assert.assertEquals(4, count);
+
+ new File(LOCAL_INPUT_FILE).delete();
+ Util.deleteFile(cluster, INPUT_FILE);
+
+ }
}