I am facing a issue of large number of small sized files (51MB each). A typical M/R job in my working environment would take ateast 2000 files, which is resulting in 2000 map tasks. Hence i thought of using CombineFileInputFormat to reduce the problem. I had written a custom implementation of CombineFileInputFormat along with CombineFileRecordReader. Below is skeleton of the code.
public class CustomInputFormat extends CombineFileInputFormat<IntWritable, RecordHolder> { @Override public RecordReader<IntWritable, RecordHolder> createRecordReader(final InputSplit split, final TaskAttemptContext taskContext) throws IOException { final CombineFileSplit fSplit = (CombineFileSplit) split; return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit, taskContext, (Class) CustomRecordReader.class); } public static class CustomRecordReader extends RecordReader<IntWritable, RecordHolder> { private IntWritable key = null; private RecordHolder value = null; public CustomRecordReader (CombineFileSplit split, TaskAttemptContext context, Integer idx) throws IOException { FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations()); Path path = fileSplit.getPath(); } @Override public void close() throws IOException { } @Override public IntWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public RecordHolder getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { } @Override public boolean nextKeyValue() throws IOException, InterruptedException { boolean dataPresent = false; if (currentIterator.hasNext()) { Object nextRecord = currentIterator.next(); key = new IntWritable(recordPosition++); value = new RecordHolder(nextObject); dataPresent = true; } else { LOGGER.info("Reached end of File:" + dataPresent); } return dataPresent; } } } Within setLocation() of Loader i specified the following configurations job.getConfiguration().set("mapred.max.split.size", "205847916"); job.getConfiguration().set("mapreduce.job.max.split.locations", "5"); I used above inputform in my custom Pig Loader. I ran a pig script to load data using custom loader and dump it. Input Size: 1256 Files * 51MB each. I was expecting atleast 3 splits that would result in 3 map tasks. However only one map task was started and it was responsible for reading all of data via the record reader. Could anyone please show me what am i missing ? I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not have the liberty to upgrade Pig version. I observed these conversations http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders http://www.mail-archive.com/user@pig.apache.org/msg03938.html They did not seem to solve my issue. Thanks for suggestions. Regards, Deepak -- Deepak