Hi
I am pretty new to Avro, and try to write an map-reduce job to read
text format and write to Avro. it is a simple word count I write based
on the example, but the reducer seems not running as I wrote. all the output
is directly from map actually. basically when I check the log, the system.out
in the reducer did not show up even in the log.
here the code, can someone give a help?
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.Pair;
public class AvroTest extends Configured implements Tool{
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, AvroKey<Utf8>, AvroValue<Long>> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<AvroKey<Utf8>, AvroValue<Long>> output,
Reporter arg3) throws IOException {
// TODO Auto-generated method stub
String valueLine = value.toString();
String[] values = valueLine.split("=");
System.out.println("key is "+values[0].toString()+" value is 1");
output.collect(new AvroKey(new Utf8(values[0])), new AvroValue(
new Long(1)));
}
}
public static class Reduce extends AvroReducer {
public void reduce(Utf8 word, Iterable<Long> counts,
AvroCollector<Pair<Utf8,Long>> output, Reporter reporter) throws IOException {
long sum = 0l;
for (long count:counts) {
sum++;
}
System.out.println("key is "+word.toString()+" value is " +sum);
Pair<Utf8,Long> outputValue = new Pair<Utf8,Long>
(word.toString(),sum);
output.collect(outputValue);
}
}
public int run(String[] args) throws Exception {
Configuration passed_conf = getConf();
JobConf conf = new JobConf(passed_conf,AvroTest.class);
conf.setJobName("Avro Test");
conf.setInputFormat(TextInputFormat.class);
conf.setMapperClass(Map.class);
AvroJob.setOutputSchema(conf, new Pair(new Utf8(""), 0L).getSchema());
AvroJob.setReducerClass(conf, Reduce.class);
FileInputFormat.setInputPaths(conf, new Path(new
String("/user/sam/test_input*")));
FileOutputFormat.setOutputPath(conf, new Path(new
String("/user/sam/avro_output")));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
GenericOptionsParser optParser = new GenericOptionsParser(args);
int exitCode = ToolRunner.run(optParser.getConfiguration(), new
AvroTest(), optParser.getRemainingArgs());
System.exit(exitCode);
}
}
Thank
Sam