I'm attempting to get the PigStorageWithInputPath example (
http://wiki.apache.org/pig/PigStorageWithInputPath) working, but I must be
missing something. It works fine if I specify a single file, but if I use a
glob in my load command all of the records end up with the first filename
only.
My loadfunc class:
package com.test;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
public class PigStorageWithInputPath extends PigStorage {
static Logger logger = Logger.getLogger(PigStorageWithInputPath.class);
Path path = null;
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader
reader, PigSplit split) {
super.prepareToRead(reader, split);
path = ((FileSplit)split.getWrappedSplit()).getPath();
logger.info("path @prepareToRead="+path);
}
@Override
public Tuple getNext() throws IOException {
Tuple myTuple = super.getNext();
if (myTuple != null)
myTuple.append(path.toString());
return myTuple;
}
}
My test data is three files. Each file has three lines in the format "file
n\tline n".
-bash-3.2$ hadoop fs -ls /test
Found 3 items
-rw-r--r-- 1 test supergroup 42 2011-07-15 15:00 /test/file1
-rw-r--r-- 1 test supergroup 42 2011-07-15 15:00 /test/file2
-rw-r--r-- 1 test supergroup 42 2011-07-15 15:00 /test/file3
My Pig script:
register /tmp/udf.jar;
a = load '/test/*' using com.test.PigStorageWithInputPath();
dump a;
The output:
(file 1,line 1,hdfs://localhost/test/file1)
(file 1,line 2,hdfs://localhost/test/file1)
(file 1,line 3,hdfs://localhost/test/file1)
(file 2,line 1,hdfs://localhost/test/file1)
(file 2,line 2,hdfs://localhost/test/file1)
(file 2,line 3,hdfs://localhost/test/file1)
(file 3,line 1,hdfs://localhost/test/file1)
(file 3,line 2,hdfs://localhost/test/file1)
(file 3,line 3,hdfs://localhost/test/file1)
The last item in each line is "file1," even though the contents clearly came
from other files. What I'm expecting to see is this:
(file 1,line 1,hdfs://localhost/test/file1)
(file 1,line 2,hdfs://localhost/test/file1)
(file 1,line 3,hdfs://localhost/test/file1)
(file 2,line 1,hdfs://localhost/test/file2)
(file 2,line 2,hdfs://localhost/test/file2)
(file 2,line 3,hdfs://localhost/test/file2)
(file 3,line 1,hdfs://localhost/test/file3)
(file 3,line 2,hdfs://localhost/test/file3)
(file 3,line 3,hdfs://localhost/test/file3)
And the task attempt log:
2011-07-15 15:00:58,933 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded
the native-hadoop library
2011-07-15 15:00:59,329 INFO org.apache.hadoop.metrics.jvm.JvmMetrics:
Initializing JVM Metrics with processName=MAP, sessionId=
2011-07-15 15:00:59,720 INFO com.test.PigStorageWithInputPath: path
@prepareToRead=hdfs://localhost/test/file1
2011-07-15 15:00:59,769 INFO com.test.PigStorageWithInputPath: path
@prepareToRead=hdfs://localhost/test/file1
2011-07-15 15:00:59,771 INFO com.test.PigStorageWithInputPath: path
@prepareToRead=hdfs://localhost/test/file1
2011-07-15 15:00:59,830 INFO org.apache.hadoop.mapred.Task:
Task:attempt_201107111628_0027_m_000000_0 is done. And is in the process of
commiting
2011-07-15 15:01:00,851 INFO org.apache.hadoop.mapred.Task: Task
attempt_201107111628_0027_m_000000_0 is allowed to commit now
2011-07-15 15:01:00,861 INFO
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of
task 'attempt_201107111628_0027_m_000000_0' to
hdfs://localhost/tmp/temp-2143171191/tmp-286804436
2011-07-15 15:01:00,875 INFO org.apache.hadoop.mapred.Task: Task
'attempt_201107111628_0027_m_000000_0' done.
2011-07-15 15:01:00,897 INFO org.apache.hadoop.mapred.TaskLogsTruncater:
Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-
For the record, I'm doing this test against a CDH3U0 installation (pig-0.8 +
hadoop-0.20.2) in pseudo-distributed mode.
Am I doing something wrong?