I have set up a Hadoop job like so:public static void main(String[] args)
throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Legion");
job.setJarByClass(Legion.class);
job.setMapperClass(CallQualityMap.class);
job.setReducerClass(CallQualityReduce.class);
// Explicitly configure map and reduce outputs, since they're different
classes
job.setMapOutputKeyClass(CallSampleKey.class);
job.setMapOutputValueClass(CallSample.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(CombineRepublicInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
CombineRepublicInputFormat.setMaxInputSplitSize(job, 128000000);
CombineRepublicInputFormat.setInputDirRecursive(job, true);
CombineRepublicInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}This job completes, but something strange happens. I get one output line per
input line. Each output line consists of the output from a
CallSampleKey.toString() method, then a tab, then something like
[email protected] means that the reduce phase is never running and the
CallSampleKey and CallSample are getting passed directly to the
TextOutputFormat. But I don't understand why this would be the case. I've very
clearly specified job.setReducerClass(CallQualityReduce.class);, so I have no
idea why it would skip the reducer!Here's the code for the mapper and
reducer:public static class CallQualityMap extends Mapper<NullWritable,
RepublicRecord, CallSampleKey, CallSample> {
private CallSampleKey outKey = new CallSampleKey();
private CallSample outValue = new CallSample();
public void setup(Context context) {
//Configuration config = context.getConfiguration();
}
public void map(NullWritable inKey, RepublicRecord inValue, Context
context) throws IOException, InterruptedException {
outKey.setData(inValue.getData("call_id"),
inValue.getData("file_name_uuid"));
outValue.setData(
Long.valueOf(inValue.getData("timestamp")),
Float.valueOf(inValue.getData("current_expand_rate")),
Float.valueOf(inValue.getData("current_buffer_size")),
inValue.getData("wifi_bssid")
);
context.write(outKey, outValue);
}
}
public static class CallQualityReduce extends Reducer<CallSampleKey,
CallSample, NullWritable, Text> {
public void reduce(CallSampleKey inKey, Iterator<CallSample> inValues,
Context context) throws IOException, InterruptedException {
Call call = new Call(inKey.getId().toString(),
inKey.getUuid().toString());
while (inValues.hasNext()) {
call.addSample(inValues.next());
}
context.write(NullWritable.get(), new Text(call.getStats()));
}
}Here's controller and syslog
output:https://s3.amazonaws.com/rw-hadoop-test/logs/j-39S8C9S340U8I/steps/s-ORYZ0393MEBG/controller.gzhttps://s3.amazonaws.com/rw-hadoop-test/logs/j-39S8C9S340U8I/steps/s-ORYZ0393MEBG/syslog.gz
Any advice on how to proceed here?