Yeah. Thanks.
I did observe that Pig combined inputs.

On Tue, Sep 24, 2013 at 11:37 AM, Dmitriy Ryaboy <dvrya...@gmail.com> wrote:

> Don't use CombinedFile InputFormat / Record Reader. Just let Pig do its
> thing.
>
>
> On Wed, Sep 18, 2013 at 9:08 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
> > I tried this
> >
> http://pig.apache.org/docs/r0.8.1/cookbook.html#Combine+Small+Input+Files
> >
> > Test Job  Details
> > Input 7 Files * 51MB each
> >
> > HDFS Counters of Job
> > Counter Map Reduce Total
> > FileSystemCounters FILE_BYTES_READ 92 23 115
> > HDFS_BYTES_READ 360,235,092 0 360,235,092
> > FILE_BYTES_WRITTEN 116,558 116,349 232,907
> > HDFS_BYTES_WRITTEN 0 9 9
> >
> > From Job Conf
> > pig.maxCombinedSplitSize 205847916
> > pig.splitCombination true
> > set mapred.max.split.size 205847916 (With/Without)
> >
> > Map Task displays these logs from CombileSlpit
> > Total Split Length:360233853
> > Total Split Paths Length:7
> >
> > With 360MB of input data, and above conf there should have been two
> splits.
> > However there was only one split and all 7 files were read from single
> map
> > task
> >
> >
> > From my loader prepareRecodReader does not use the PigSplit
> >
> >        @Override
> > public void prepareToRead(RecordReader arg0, PigSplit arg1) throws
> > IOException {
> >  reader = (CombineFileRecordReader) (arg0);
> > }
> >
> > Any suggestions ?
> >
> >
> > On Wed, Sep 18, 2013 at 8:49 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> > wrote:
> >
> > > I am facing a issue of large number of small sized files (51MB each). A
> > > typical M/R job in my working environment would take ateast 2000 files,
> > > which is resulting in 2000 map tasks. Hence i thought of using
> > > CombineFileInputFormat to reduce the problem. I had written a custom
> > > implementation of CombineFileInputFormat along
> > > with CombineFileRecordReader. Below is skeleton of the code.
> > >
> > >
> > > public class CustomInputFormat extends
> > CombineFileInputFormat<IntWritable,
> > > RecordHolder> {
> > > @Override
> > > public RecordReader<IntWritable, RecordHolder> createRecordReader(final
> > > InputSplit split,
> > >  final TaskAttemptContext taskContext) throws IOException {
> > > final CombineFileSplit fSplit = (CombineFileSplit) split;
> > >  return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit,
> > > taskContext,
> > > (Class) CustomRecordReader.class);
> > >  }
> > >
> > > public static class CustomRecordReader extends
> RecordReader<IntWritable,
> > > RecordHolder> {
> > >                 private IntWritable key = null;
> > > private RecordHolder value = null;
> > > public CustomRecordReader (CombineFileSplit split, TaskAttemptContext
> > > context, Integer idx)
> > >  throws IOException {
> > > FileSplit fileSplit = new FileSplit(split.getPath(idx),
> > > split.getOffset(idx), split.getLength(idx),
> > >  split.getLocations());
> > > Path path = fileSplit.getPath();
> > > }
> > >
> > > @Override
> > > public void close() throws IOException {
> > > }
> > >
> > > @Override
> > > public IntWritable getCurrentKey() throws IOException,
> > > InterruptedException {
> > >  return key;
> > > }
> > >
> > > @Override
> > > public RecordHolder getCurrentValue() throws IOException,
> > > InterruptedException {
> > >  return value;
> > > }
> > >
> > > @Override
> > >  public float getProgress() throws IOException, InterruptedException {
> > > return 0;
> > > }
> > >
> > > @Override
> > > public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws
> > > IOException, InterruptedException {
> > >
> > > }
> > >
> > > @Override
> > > public boolean nextKeyValue() throws IOException, InterruptedException
> {
> > >  boolean dataPresent = false;
> > > if (currentIterator.hasNext()) {
> > > Object nextRecord = currentIterator.next();
> > >  key = new IntWritable(recordPosition++);
> > > value = new RecordHolder(nextObject);
> > > dataPresent = true;
> > >  } else {
> > > LOGGER.info("Reached end of File:" + dataPresent);
> > > }
> > >  return dataPresent;
> > > }
> > > }
> > > }
> > >
> > > Within setLocation() of Loader i specified the following configurations
> > > job.getConfiguration().set("mapred.max.split.size", "205847916");
> > > job.getConfiguration().set("mapreduce.job.max.split.locations", "5");
> > >
> > > I used above inputform in my custom Pig Loader. I ran a pig script to
> > load
> > > data using custom loader and dump it.
> > > Input Size: 1256 Files * 51MB each.
> > >
> > > I was expecting atleast 3 splits that would result in 3 map tasks.
> > However
> > > only one map task was started and it was responsible for reading all of
> > > data via the record reader.
> > >
> > > Could anyone please show me what am i missing ?
> > >
> > > I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not
> have
> > > the liberty to upgrade Pig version.
> > >
> > > I observed these conversations
> > >
> > >
> >
> http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders
> > > http://www.mail-archive.com/user@pig.apache.org/msg03938.html
> > >
> > > They did not seem to solve my issue.
> > >
> > > Thanks for suggestions.
> > >
> > > Regards,
> > > Deepak
> > >
> > >
> > >
> > > --
> > > Deepak
> > >
> > >
> >
> >
> > --
> > Deepak
> >
>



-- 
Deepak

Reply via email to