This took me a full read through to figure out. The problem is that
you're using your reducer as a combiner and when it runs, the output
of the map stage then becomes the wrong type.

In pseudo-visual-speak:

Object, Int -> Map() -> MyText, Int -> Combine() -> YourText, Int -> EXCEPTION!

When using your reducer as a combiner, the reducer outputs *must*
match the map outputs. In other words, your combiner - which is
*optional* in the chain at Hadoop's pleasure - is changing the key
space. That's a no-no. In your case, you can't reuse your reducer as a
combiner.

(The hint is in the exception: it's occurring in the combiner classes
in Hadoop.)

Hope that helps.

On Fri, Jun 18, 2010 at 2:09 PM, Steve Lewis <lordjoe2...@gmail.com> wrote:
>
> This class is a copy of a standard WordCount class with one critical
> exception
> Instead of the Mapper Emitting a Key of Type Text it emits a key of type
> MyText - s simple subclass of Text
> The reducer emits a different subclass of Text - YourText
> I say
>         job.setMapOutputKeyClass(MyText.class);
>         job.setMapOutputValueClass(IntWritable.class);
>         job.setOutputKeyClass(YourText.class);
>         job.setOutputValueClass(IntWritable.class);
> which should declare these classes directly  and yet I get the following
> exception using hadoop 0.2 on a local box
> What am I doing wrong
>
> java.io.IOException: wrong key class: class
> org.systemsbiology.hadoop.CapitalWordCount$YourText is not class
> org.systemsbiology.hadoop.CapitalWordCount$MyText
> at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
> at
> org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880)
> at
> org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201)
> at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> at
> org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89)
>
> package org.systemsbiology.hadoop;
> import com.lordjoe.utilities.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapreduce.*;
> import org.apache.hadoop.mapreduce.lib.input.*;
> import org.apache.hadoop.mapreduce.lib.output.*;
> import org.apache.hadoop.util.*;
> import java.io.*;
> import java.util.*;
> /**
>  *  org.systemsbiology.hadoop.CapitalWordCount
>  */
> public class CapitalWordCount {
>     public static class YourText extends Text
>       {
>           public YourText() {
>           }
>           /**
>            * Construct from a string.
>            */
>           public YourText(final String string) {
>               super(string);
>           }
>       }
>     public static class MyText extends Text
>     {
>         public MyText() {
>         }
>         /**
>          * Construct from a string.
>          */
>         public MyText(final String string) {
>             super(string);
>         }
>
>     }
>     public static class TokenizerMapper
>             extends Mapper<Object, Text, MyText, IntWritable> {
>         private final static IntWritable one = new IntWritable(1);
>         private MyText word = new MyText();
>         public void map(Object key, Text value, Context context
>         ) throws IOException, InterruptedException {
>             StringTokenizer itr = new StringTokenizer(value.toString());
>             while (itr.hasMoreTokens()) {
>                 String s = itr.nextToken().toUpperCase();
>                 s = dropNonLetters(s);
>                 if (s.length() > 0) {
>                     word.set(s);
>                     context.write(word, one);
>                 }
>             }
>         }
>     }
>     public static String dropNonLetters(String s) {
>         StringBuilder sb = new StringBuilder();
>         for (int i = 0; i < s.length(); i++) {
>             char c = s.charAt(i);
>             if (Character.isLetter(c))
>                 sb.append(c);
>         }
>         return sb.toString();
>     }
>     public static class IntSumReducer
>             extends Reducer<MyText, IntWritable, YourText, IntWritable> {
>         private IntWritable result = new IntWritable();
>         public void reduce(MyText key, Iterable<IntWritable> values,
>                            Context context
>         ) throws IOException, InterruptedException {
>             int sum = 0;
>             for (IntWritable val : values) {
>                 sum += val.get();
>             }
>             result.set(sum);
>             context.write(new YourText(key.toString()), result);
>         }
>     }
>     public static class MyPartitioner extends Partitioner<Text, IntWritable>
> {
>         /**
>          * Get the partition number for a given key (hence record) given the
> total
>          * number of partitions i.e. number of reduce-tasks for the job.
>          * <p/>
>          * <p>Typically a hash function on a all or a subset of the key.</p>
>          *
>          * @param key           the key to be partioned.
>          * @param value         the entry value.
>          * @param numPartitions the total number of partitions.
>          * @return the partition number for the <code>key</code>.
>          */
>        �...@override
>         public int getPartition(Text key, IntWritable value, int
> numPartitions) {
>             String s = key.toString();
>             if (s.length() == 0)
>                 return 0;
>             char c = s.charAt(0);
>             int letter = Character.toUpperCase(c) - 'A';
>             if (letter < 0 || letter > 26)
>                 return 0;
>             return letter % numPartitions;
>         }
>     }
>
>     /**
>      * Force loading of needed classes to make
>      */
>     public static final Class[] NEEDED =
>             {
>                     org.apache.commons.logging.LogFactory.class,
>                     org.apache.commons.cli.ParseException.class
>             };
>
>     public static final int DEFAULT_REDUCE_TASKS = 14;
>     public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         String[] otherArgs = new GenericOptionsParser(conf,
> args).getRemainingArgs();
> //        if (otherArgs.length != 2) {
> //            System.err.println("Usage: wordcount <in> <out>");
> //            System.exit(2);
> //        }
>         Job job = new Job(conf, "word count");
>         job.setJarByClass(CapitalWordCount.class);
>         job.setMapperClass(TokenizerMapper.class);
>         job.setCombinerClass(IntSumReducer.class);
>         job.setReducerClass(IntSumReducer.class);
>
>         job.setMapOutputKeyClass(MyText.class);
>         job.setMapOutputValueClass(IntWritable.class);
>         job.setOutputKeyClass(YourText.class);
>         job.setOutputValueClass(IntWritable.class);
>
>         // added Slewis
>         job.setNumReduceTasks(DEFAULT_REDUCE_TASKS);
>         job.setPartitionerClass(MyPartitioner.class);
>         if(otherArgs.length > 1)    {
>             FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
>         }
>         String athString = otherArgs[otherArgs.length - 1];
>         File out = new File(athString);
>         if (out.exists()) {
>             FileUtilities.expungeDirectory(out);
>             out.delete();
>         }
>         Path outputDir = new Path(athString);
>
>         FileOutputFormat.setOutputPath(job, outputDir);
>
>         boolean ans = job.waitForCompletion(true);
>         int ret = ans ? 0 : 1;
>         System.exit(ret);
>     }
> }
> --
> Steven M. Lewis PhD
> Institute for Systems Biology
> Seattle WA
>



-- 
Eric Sammer
twitter: esammer
data: www.cloudera.com

Reply via email to