This class is a copy of a standard WordCount class with one critical exception Instead of the Mapper Emitting a Key of Type Text it emits a key of type MyText - s simple subclass of Text The reducer emits a different subclass of Text - YourText I say job.setMapOutputKeyClass(MyText.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(YourText.class); job.setOutputValueClass(IntWritable.class);
which should declare these classes directly and yet I get the following exception using hadoop 0.2 on a local box What am I doing wrong java.io.IOException: wrong key class: class org.systemsbiology.hadoop.CapitalWordCount$YourText is not class org.systemsbiology.hadoop.CapitalWordCount$MyText at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164) at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880) at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89) package org.systemsbiology.hadoop; import com.lordjoe.utilities.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; import java.io.*; import java.util.*; /** * org.systemsbiology.hadoop.CapitalWordCount */ public class CapitalWordCount { public static class YourText extends Text { public YourText() { } /** * Construct from a string. */ public YourText(final String string) { super(string); } } public static class MyText extends Text { public MyText() { } /** * Construct from a string. */ public MyText(final String string) { super(string); } } public static class TokenizerMapper extends Mapper<Object, Text, MyText, IntWritable> { private final static IntWritable one = new IntWritable(1); private MyText word = new MyText(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { String s = itr.nextToken().toUpperCase(); s = dropNonLetters(s); if (s.length() > 0) { word.set(s); context.write(word, one); } } } } public static String dropNonLetters(String s) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < s.length(); i++) { char c = s.charAt(i); if (Character.isLetter(c)) sb.append(c); } return sb.toString(); } public static class IntSumReducer extends Reducer<MyText, IntWritable, YourText, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(MyText key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(new YourText(key.toString()), result); } } public static class MyPartitioner extends Partitioner<Text, IntWritable> { /** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job. * <p/> * <p>Typically a hash function on a all or a subset of the key.</p> * * @param key the key to be partioned. * @param value the entry value. * @param numPartitions the total number of partitions. * @return the partition number for the <code>key</code>. */ @Override public int getPartition(Text key, IntWritable value, int numPartitions) { String s = key.toString(); if (s.length() == 0) return 0; char c = s.charAt(0); int letter = Character.toUpperCase(c) - 'A'; if (letter < 0 || letter > 26) return 0; return letter % numPartitions; } } /** * Force loading of needed classes to make */ public static final Class[] NEEDED = { org.apache.commons.logging.LogFactory.class, org.apache.commons.cli.ParseException.class }; public static final int DEFAULT_REDUCE_TASKS = 14; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // if (otherArgs.length != 2) { // System.err.println("Usage: wordcount <in> <out>"); // System.exit(2); // } Job job = new Job(conf, "word count"); job.setJarByClass(CapitalWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setMapOutputKeyClass(MyText.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(YourText.class); job.setOutputValueClass(IntWritable.class); // added Slewis job.setNumReduceTasks(DEFAULT_REDUCE_TASKS); job.setPartitionerClass(MyPartitioner.class); if(otherArgs.length > 1) { FileInputFormat.addInputPath(job, new Path(otherArgs[0])); } String athString = otherArgs[otherArgs.length - 1]; File out = new File(athString); if (out.exists()) { FileUtilities.expungeDirectory(out); out.delete(); } Path outputDir = new Path(athString); FileOutputFormat.setOutputPath(job, outputDir); boolean ans = job.waitForCompletion(true); int ret = ans ? 0 : 1; System.exit(ret); } } -- Steven M. Lewis PhD Institute for Systems Biology Seattle WA