I am working on a large application where I need to pass a big array of
intwritables from mapper to reducer. Obviously I want to use combiner and I
am extending ArrayWritable. Here is the signature of my mapper, reducer and
IntArrayWritable


*Mapper:*
public class AppMapper extends Mapper<LongWritable, Text, Text,
IntArrayWritable> {
    @Override
    public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {

         context.write(outkey, new IntArrayWritable(array_of_intwritables));
    }
}


*Reducer:*
public class AppReducer extends Reducer<Text, IntArrayWritable, Text, Text>
{

    @Override
    public void reduce(Text key, Iterable<IntArrayWritable> values, Context
context) throws IOException, InterruptedException {
           context.write(key, new Text(some_stuff));

*IntArrayWritable:*
public class IntArrayWritable extends ArrayWritable
{
    public IntArrayWritable() {
        super(IntWritable.class);
    }
    public IntArrayWritable(IntWritable[] values) {
        super(IntWritable.class, values);
    }
}


*Here is how my Job looks like:*
    private Job AppRunner(Path inputPath, Path outputPath, Configuration
conf) throws IOException {
        Job job = new Job(conf);

        job.setJobName(String.format("App Job: %s => %s", inputPath,
outputPath));
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntArrayWritable.class);

        job.setMapperClass(CommonNeighborsMapper.class);
        job.setCombinerClass(CommonNeighborsReducer.class);
        job.setReducerClass(CommonNeighborsReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setNumReduceTasks(28);
        return job;
    }

*
Error Trace:

*java.io.IOException: Spill failed
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1070)
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1051)
    at java.io.DataOutputStream.writeInt(DataOutputStream.java:183)
    at org.apache.hadoop.io.IntWritable.write(IntWritable.java:42)
    at org.apache.hadoop.io.ArrayWritable.write(ArrayWritable.java:98)
    at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:90)
    at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:77)
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:926)
    at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:574)
    at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at com.test.app.AppMapper.map(AppMapper.java:67)
    at com.test.app.neighbors.AppMapper.map(AppMapper.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
    at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
Caused by: java.io.IOException: wrong value class: class
org.apache.hadoop.io.Text is not class
com.test.app.mr.writables.IntArrayWritable
    at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:175)
    at
org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1042)
    at
org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1363)
    at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at com.test.app.neighbors.AppReducer.reduce(AppReducer.java:54)
    at com.test.app.neighbors.AppReducer.reduce(AppReducer.java:1)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
    at
org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1384)
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1291)
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:712)
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1199)

*
*I think the error is due to using combiner. Since combiner is output data
in Text and Reducer is expecting IntArrayWritable. If I remove combiner
everything works. What am I doing wrong and how can I get the combiner to
work? Any help is greatly appreciated.
*
*--
Vipul Sharma
sharmavipul AT gmail DOT com

Reply via email to