Yeah. Thanks. I did observe that Pig combined inputs.
On Tue, Sep 24, 2013 at 11:37 AM, Dmitriy Ryaboy <dvrya...@gmail.com> wrote: > Don't use CombinedFile InputFormat / Record Reader. Just let Pig do its > thing. > > > On Wed, Sep 18, 2013 at 9:08 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > > > I tried this > > > http://pig.apache.org/docs/r0.8.1/cookbook.html#Combine+Small+Input+Files > > > > Test Job Details > > Input 7 Files * 51MB each > > > > HDFS Counters of Job > > Counter Map Reduce Total > > FileSystemCounters FILE_BYTES_READ 92 23 115 > > HDFS_BYTES_READ 360,235,092 0 360,235,092 > > FILE_BYTES_WRITTEN 116,558 116,349 232,907 > > HDFS_BYTES_WRITTEN 0 9 9 > > > > From Job Conf > > pig.maxCombinedSplitSize 205847916 > > pig.splitCombination true > > set mapred.max.split.size 205847916 (With/Without) > > > > Map Task displays these logs from CombileSlpit > > Total Split Length:360233853 > > Total Split Paths Length:7 > > > > With 360MB of input data, and above conf there should have been two > splits. > > However there was only one split and all 7 files were read from single > map > > task > > > > > > From my loader prepareRecodReader does not use the PigSplit > > > > @Override > > public void prepareToRead(RecordReader arg0, PigSplit arg1) throws > > IOException { > > reader = (CombineFileRecordReader) (arg0); > > } > > > > Any suggestions ? > > > > > > On Wed, Sep 18, 2013 at 8:49 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > > wrote: > > > > > I am facing a issue of large number of small sized files (51MB each). A > > > typical M/R job in my working environment would take ateast 2000 files, > > > which is resulting in 2000 map tasks. Hence i thought of using > > > CombineFileInputFormat to reduce the problem. I had written a custom > > > implementation of CombineFileInputFormat along > > > with CombineFileRecordReader. Below is skeleton of the code. > > > > > > > > > public class CustomInputFormat extends > > CombineFileInputFormat<IntWritable, > > > RecordHolder> { > > > @Override > > > public RecordReader<IntWritable, RecordHolder> createRecordReader(final > > > InputSplit split, > > > final TaskAttemptContext taskContext) throws IOException { > > > final CombineFileSplit fSplit = (CombineFileSplit) split; > > > return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit, > > > taskContext, > > > (Class) CustomRecordReader.class); > > > } > > > > > > public static class CustomRecordReader extends > RecordReader<IntWritable, > > > RecordHolder> { > > > private IntWritable key = null; > > > private RecordHolder value = null; > > > public CustomRecordReader (CombineFileSplit split, TaskAttemptContext > > > context, Integer idx) > > > throws IOException { > > > FileSplit fileSplit = new FileSplit(split.getPath(idx), > > > split.getOffset(idx), split.getLength(idx), > > > split.getLocations()); > > > Path path = fileSplit.getPath(); > > > } > > > > > > @Override > > > public void close() throws IOException { > > > } > > > > > > @Override > > > public IntWritable getCurrentKey() throws IOException, > > > InterruptedException { > > > return key; > > > } > > > > > > @Override > > > public RecordHolder getCurrentValue() throws IOException, > > > InterruptedException { > > > return value; > > > } > > > > > > @Override > > > public float getProgress() throws IOException, InterruptedException { > > > return 0; > > > } > > > > > > @Override > > > public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws > > > IOException, InterruptedException { > > > > > > } > > > > > > @Override > > > public boolean nextKeyValue() throws IOException, InterruptedException > { > > > boolean dataPresent = false; > > > if (currentIterator.hasNext()) { > > > Object nextRecord = currentIterator.next(); > > > key = new IntWritable(recordPosition++); > > > value = new RecordHolder(nextObject); > > > dataPresent = true; > > > } else { > > > LOGGER.info("Reached end of File:" + dataPresent); > > > } > > > return dataPresent; > > > } > > > } > > > } > > > > > > Within setLocation() of Loader i specified the following configurations > > > job.getConfiguration().set("mapred.max.split.size", "205847916"); > > > job.getConfiguration().set("mapreduce.job.max.split.locations", "5"); > > > > > > I used above inputform in my custom Pig Loader. I ran a pig script to > > load > > > data using custom loader and dump it. > > > Input Size: 1256 Files * 51MB each. > > > > > > I was expecting atleast 3 splits that would result in 3 map tasks. > > However > > > only one map task was started and it was responsible for reading all of > > > data via the record reader. > > > > > > Could anyone please show me what am i missing ? > > > > > > I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not > have > > > the liberty to upgrade Pig version. > > > > > > I observed these conversations > > > > > > > > > http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders > > > http://www.mail-archive.com/user@pig.apache.org/msg03938.html > > > > > > They did not seem to solve my issue. > > > > > > Thanks for suggestions. > > > > > > Regards, > > > Deepak > > > > > > > > > > > > -- > > > Deepak > > > > > > > > > > > > -- > > Deepak > > > -- Deepak