First time poster, I hope I'm not breaking any rule and if I am please tell me 
:)

I am trying to perform "total order" sorting on data previously stored on a 
pseudo-distributed HDFS (running with YARN) into a SequenceFile (keys: 
IntWritable, vals: Text).  The SequenceFile, stored at /cnts/part-r-00000 to 
part-r-00006, contains 44 records altogether.

bin/hdfs dfs -text /cnts* prints 44 lines like:
2       314     |       12      |       21
2       700     |       12      |       21
1       700     |       12      |       28
1       2       |       420     |       11120
2       2       |       11      |       3
2       700     |       11      |       3
1       700     |       12      |       19
(...)
1       314     |       12      |       30
3       314     |       420     |       6
3       700     |       420     |       6
3       2       |       420     |       6
1       2       |       421     |       36

I run the job as follows, within main:
int exitCode = -1;
if ("sort".equals(args[0])) {
    exitCode = ToolRunner.run(new Configuration(),
                              new Sorter(),
                              Arrays.copyOfRange(args,1,args.length));
}

Sorter extends Configured, and implements Tool as follows:
public int run(String[] args) throws Exception {
    (...)
    //Set job-specific params
    job.setJobName("sort");
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setPartitionerClass(TotalOrderPartitioner.class);

    // Setup and run subset sampling
    Double p = .1;
    int maxNbSamples = 10;
    int maxNbSplitsRead = 3;
    InputSampler.Sampler<IntWritable, Text> sampler =
          new InputSampler.RandomSampler<IntWritable,Text>(p,
                                                           maxNbSamples,
                                                           maxNbSplitsRead);
    InputSampler.writePartitionFile(job, sampler);

    // Submit the job & poll for progress until it completes
    return job.waitForCompletion(true) ? 0 : -1;
}

I use the default "identity" Mapper and Reducer classes together with 
TotalOrderPartitioner and a RandomInputSampler to configure the partitioning.  
It works fine so long as I only use a single reducer task but becomes unstable 
when -Dmpareduce.job.reduces=N with N > 1.

By unstable, I mean that it apparently randomly
* terminates successfully;
* fails with ArrayIndexOutOfBoundsException @ InputSampler.writePartitionFile() 
and exits;
* or fails with IllegalArgumentException: Can't read partitions file @ 
TotalOrderPArtitioner.setConf() and crashes my machine.

The probability distribution of these 3 outcomes seems to vary slightly if I 
change the values passed to  the InputSampler constructor and the number of 
reduce tasks, but the tool is never stable.

Can anyone shed light on this at all?

Reply via email to