I am facing a issue of large number of small sized files (51MB each). A
typical M/R job in my working environment would take ateast 2000 files,
which is resulting in 2000 map tasks. Hence i thought of using
CombineFileInputFormat to reduce the problem. I had written a custom
implementation of CombineFileInputFormat along
with CombineFileRecordReader. Below is skeleton of the code.


public class CustomInputFormat extends CombineFileInputFormat<IntWritable,
RecordHolder> {
@Override
public RecordReader<IntWritable, RecordHolder> createRecordReader(final
InputSplit split,
 final TaskAttemptContext taskContext) throws IOException {
final CombineFileSplit fSplit = (CombineFileSplit) split;
 return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit,
taskContext,
(Class) CustomRecordReader.class);
 }

public static class CustomRecordReader extends RecordReader<IntWritable,
RecordHolder> {
                private IntWritable key = null;
private RecordHolder value = null;
public CustomRecordReader (CombineFileSplit split, TaskAttemptContext
context, Integer idx)
 throws IOException {
FileSplit fileSplit = new FileSplit(split.getPath(idx),
split.getOffset(idx), split.getLength(idx),
 split.getLocations());
Path path = fileSplit.getPath();
}

@Override
public void close() throws IOException {
}

@Override
public IntWritable getCurrentKey() throws IOException, InterruptedException
{
 return key;
}

@Override
public RecordHolder getCurrentValue() throws IOException,
InterruptedException {
 return value;
}

@Override
 public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws
IOException, InterruptedException {

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
 boolean dataPresent = false;
if (currentIterator.hasNext()) {
Object nextRecord = currentIterator.next();
 key = new IntWritable(recordPosition++);
value = new RecordHolder(nextObject);
dataPresent = true;
 } else {
LOGGER.info("Reached end of File:" + dataPresent);
}
 return dataPresent;
}
}
}

Within setLocation() of Loader i specified the following configurations
job.getConfiguration().set("mapred.max.split.size", "205847916");
job.getConfiguration().set("mapreduce.job.max.split.locations", "5");

I used above inputform in my custom Pig Loader. I ran a pig script to load
data using custom loader and dump it.
Input Size: 1256 Files * 51MB each.

I was expecting atleast 3 splits that would result in 3 map tasks. However
only one map task was started and it was responsible for reading all of
data via the record reader.

Could anyone please show me what am i missing ?

I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not have
the liberty to upgrade Pig version.

I observed these conversations
http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders
http://www.mail-archive.com/user@pig.apache.org/msg03938.html

They did not seem to solve my issue.

Thanks for suggestions.

Regards,
Deepak



-- 
Deepak

Reply via email to