This took me a full read through to figure out. The problem is that you're using your reducer as a combiner and when it runs, the output of the map stage then becomes the wrong type.
In pseudo-visual-speak: Object, Int -> Map() -> MyText, Int -> Combine() -> YourText, Int -> EXCEPTION! When using your reducer as a combiner, the reducer outputs *must* match the map outputs. In other words, your combiner - which is *optional* in the chain at Hadoop's pleasure - is changing the key space. That's a no-no. In your case, you can't reuse your reducer as a combiner. (The hint is in the exception: it's occurring in the combiner classes in Hadoop.) Hope that helps. On Fri, Jun 18, 2010 at 2:09 PM, Steve Lewis <lordjoe2...@gmail.com> wrote: > > This class is a copy of a standard WordCount class with one critical > exception > Instead of the Mapper Emitting a Key of Type Text it emits a key of type > MyText - s simple subclass of Text > The reducer emits a different subclass of Text - YourText > I say > job.setMapOutputKeyClass(MyText.class); > job.setMapOutputValueClass(IntWritable.class); > job.setOutputKeyClass(YourText.class); > job.setOutputValueClass(IntWritable.class); > which should declare these classes directly and yet I get the following > exception using hadoop 0.2 on a local box > What am I doing wrong > > java.io.IOException: wrong key class: class > org.systemsbiology.hadoop.CapitalWordCount$YourText is not class > org.systemsbiology.hadoop.CapitalWordCount$MyText > at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164) > at > org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880) > at > org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201) > at > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > at > org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89) > > package org.systemsbiology.hadoop; > import com.lordjoe.utilities.*; > import org.apache.hadoop.conf.*; > import org.apache.hadoop.fs.*; > import org.apache.hadoop.io.*; > import org.apache.hadoop.mapreduce.*; > import org.apache.hadoop.mapreduce.lib.input.*; > import org.apache.hadoop.mapreduce.lib.output.*; > import org.apache.hadoop.util.*; > import java.io.*; > import java.util.*; > /** > * org.systemsbiology.hadoop.CapitalWordCount > */ > public class CapitalWordCount { > public static class YourText extends Text > { > public YourText() { > } > /** > * Construct from a string. > */ > public YourText(final String string) { > super(string); > } > } > public static class MyText extends Text > { > public MyText() { > } > /** > * Construct from a string. > */ > public MyText(final String string) { > super(string); > } > > } > public static class TokenizerMapper > extends Mapper<Object, Text, MyText, IntWritable> { > private final static IntWritable one = new IntWritable(1); > private MyText word = new MyText(); > public void map(Object key, Text value, Context context > ) throws IOException, InterruptedException { > StringTokenizer itr = new StringTokenizer(value.toString()); > while (itr.hasMoreTokens()) { > String s = itr.nextToken().toUpperCase(); > s = dropNonLetters(s); > if (s.length() > 0) { > word.set(s); > context.write(word, one); > } > } > } > } > public static String dropNonLetters(String s) { > StringBuilder sb = new StringBuilder(); > for (int i = 0; i < s.length(); i++) { > char c = s.charAt(i); > if (Character.isLetter(c)) > sb.append(c); > } > return sb.toString(); > } > public static class IntSumReducer > extends Reducer<MyText, IntWritable, YourText, IntWritable> { > private IntWritable result = new IntWritable(); > public void reduce(MyText key, Iterable<IntWritable> values, > Context context > ) throws IOException, InterruptedException { > int sum = 0; > for (IntWritable val : values) { > sum += val.get(); > } > result.set(sum); > context.write(new YourText(key.toString()), result); > } > } > public static class MyPartitioner extends Partitioner<Text, IntWritable> > { > /** > * Get the partition number for a given key (hence record) given the > total > * number of partitions i.e. number of reduce-tasks for the job. > * <p/> > * <p>Typically a hash function on a all or a subset of the key.</p> > * > * @param key the key to be partioned. > * @param value the entry value. > * @param numPartitions the total number of partitions. > * @return the partition number for the <code>key</code>. > */ > �...@override > public int getPartition(Text key, IntWritable value, int > numPartitions) { > String s = key.toString(); > if (s.length() == 0) > return 0; > char c = s.charAt(0); > int letter = Character.toUpperCase(c) - 'A'; > if (letter < 0 || letter > 26) > return 0; > return letter % numPartitions; > } > } > > /** > * Force loading of needed classes to make > */ > public static final Class[] NEEDED = > { > org.apache.commons.logging.LogFactory.class, > org.apache.commons.cli.ParseException.class > }; > > public static final int DEFAULT_REDUCE_TASKS = 14; > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > String[] otherArgs = new GenericOptionsParser(conf, > args).getRemainingArgs(); > // if (otherArgs.length != 2) { > // System.err.println("Usage: wordcount <in> <out>"); > // System.exit(2); > // } > Job job = new Job(conf, "word count"); > job.setJarByClass(CapitalWordCount.class); > job.setMapperClass(TokenizerMapper.class); > job.setCombinerClass(IntSumReducer.class); > job.setReducerClass(IntSumReducer.class); > > job.setMapOutputKeyClass(MyText.class); > job.setMapOutputValueClass(IntWritable.class); > job.setOutputKeyClass(YourText.class); > job.setOutputValueClass(IntWritable.class); > > // added Slewis > job.setNumReduceTasks(DEFAULT_REDUCE_TASKS); > job.setPartitionerClass(MyPartitioner.class); > if(otherArgs.length > 1) { > FileInputFormat.addInputPath(job, new Path(otherArgs[0])); > } > String athString = otherArgs[otherArgs.length - 1]; > File out = new File(athString); > if (out.exists()) { > FileUtilities.expungeDirectory(out); > out.delete(); > } > Path outputDir = new Path(athString); > > FileOutputFormat.setOutputPath(job, outputDir); > > boolean ans = job.waitForCompletion(true); > int ret = ans ? 0 : 1; > System.exit(ret); > } > } > -- > Steven M. Lewis PhD > Institute for Systems Biology > Seattle WA > -- Eric Sammer twitter: esammer data: www.cloudera.com