I am trying to create a mapreduce example that add values of same keys. E.g. the input A 1 A 2 B 4
get the output A 3 B4 The problem is that I cannot make the program read 2 inputs. How I do that? Here is my example: package org.apache.hadoop.examples; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * This is an example Hadoop Map/Reduce application. * It takes in several outputs of the count lines and sum them together acordinc the line. * * To run: bin/hadoop jar build/countlinesaggregator.jar * [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dirs</i> <i>out-dir</i> * e.g. * bin/hadoop jar countlinesaggregator.jar /gutenberg-output1 /gutenberg-output2 /final-output */ public class CountLinesAggregator extends Configured implements Tool { /** * Aggregate keys and values. * For each line of input, break the line into words and emit them as * (<b>lines</b>, <b>val</b>). */ public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line, "\n"); while (itr.hasMoreTokens()) { String token = itr.nextToken(); if(token.length() >0 ) { System.out.println("Token: " + token); String[] splits = token.split("\t"); if(splits[0] != null && splits[1] != null && splits[0].length() > 0 && splits[1].length() > 0) { System.out.println(Arrays.deepToString(splits)); String k = splits[0]; String v = splits[1]; word.set(k); IntWritable val = new IntWritable(Integer.valueOf(v)); output.collect(word, val); } } } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } static int printUsage() { System.out.println("countlinesaggregator [-m <maps>] [-r <reduces>] <input1> <input2> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } /** * The main driver for word count map/reduce program. * Invoke this method to submit the map/reduce job. * @throws IOException When there is communication problems with the * job tracker. */ public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), CountLinesAggregator.class); conf.setJobName("countlinesaggregator"); // the keys are words (strings) conf.setOutputKeyClass(Text.class); // the values are counts (ints) conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setNumReduceTasks(1); List<String> other_args = new ArrayList<String>(); for(int i=0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); } } // Make sure there are exactly 2 parameters left. if (other_args.size() < 3) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size()); return printUsage(); } String inputpath = ""; for(int i=0; i<other_args.size()-1; i++) { if(i<other_args.size()-2) inputpath += other_args.get(i) + ","; else inputpath += other_args.get(i); } String outputpath=other_args.get(other_args.size()-1); System.out.println("Input path: " + inputpath); System.out.println("Output path: " + outputpath); FileInputFormat.setInputPaths(conf, inputpath); FileOutputFormat.setOutputPath(conf, new Path(outputpath)); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CountLinesAggregator(), args); System.exit(res); } } -- Best regards,