There is no need to call job.setCombinerClass()
Combiner is optional.

On Sat, Jun 19, 2010 at 10:01 AM, Steve Lewis <lordjoe2...@gmail.com> wrote:

> Wow -  I cannot tell you how much I thank you - I totally missed the fact
> that the exception is thrown in the combiner since I was seeing the
> exception in the reducer - I always thought the combiner was called between
> the mapper and the reducer and not after
> the reducer -
> Also does this mean I should use null as a combiner or use a very
> generic combiner - especially for my real problem when there is no real
> combiner step
>
> On Fri, Jun 18, 2010 at 2:45 PM, Eric Sammer <esam...@cloudera.com> wrote:
>
>> This took me a full read through to figure out. The problem is that
>> you're using your reducer as a combiner and when it runs, the output
>> of the map stage then becomes the wrong type.
>>
>> In pseudo-visual-speak:
>>
>> Object, Int -> Map() -> MyText, Int -> Combine() -> YourText, Int ->
>> EXCEPTION!
>>
>> When using your reducer as a combiner, the reducer outputs *must*
>> match the map outputs. In other words, your combiner - which is
>> *optional* in the chain at Hadoop's pleasure - is changing the key
>> space. That's a no-no. In your case, you can't reuse your reducer as a
>> combiner.
>>
>> (The hint is in the exception: it's occurring in the combiner classes
>> in Hadoop.)
>>
>> Hope that helps.
>>
>> On Fri, Jun 18, 2010 at 2:09 PM, Steve Lewis <lordjoe2...@gmail.com>
>> wrote:
>> >
>> > This class is a copy of a standard WordCount class with one critical
>> > exception
>> > Instead of the Mapper Emitting a Key of Type Text it emits a key of type
>> > MyText - s simple subclass of Text
>> > The reducer emits a different subclass of Text - YourText
>> > I say
>> >         job.setMapOutputKeyClass(MyText.class);
>> >         job.setMapOutputValueClass(IntWritable.class);
>> >         job.setOutputKeyClass(YourText.class);
>> >         job.setOutputValueClass(IntWritable.class);
>> > which should declare these classes directly  and yet I get the following
>> > exception using hadoop 0.2 on a local box
>> > What am I doing wrong
>> >
>> > java.io.IOException: wrong key class: class
>> > org.systemsbiology.hadoop.CapitalWordCount$YourText is not class
>> > org.systemsbiology.hadoop.CapitalWordCount$MyText
>> > at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
>> > at
>> >
>> org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880)
>> > at
>> >
>> org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201)
>> > at
>> >
>> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>> > at
>> >
>> org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89)
>> >
>> > package org.systemsbiology.hadoop;
>> > import com.lordjoe.utilities.*;
>> > import org.apache.hadoop.conf.*;
>> > import org.apache.hadoop.fs.*;
>> > import org.apache.hadoop.io.*;
>> > import org.apache.hadoop.mapreduce.*;
>> > import org.apache.hadoop.mapreduce.lib.input.*;
>> > import org.apache.hadoop.mapreduce.lib.output.*;
>> > import org.apache.hadoop.util.*;
>> > import java.io.*;
>> > import java.util.*;
>> > /**
>> >  *  org.systemsbiology.hadoop.CapitalWordCount
>> >  */
>> > public class CapitalWordCount {
>> >     public static class YourText extends Text
>> >       {
>> >           public YourText() {
>> >           }
>> >           /**
>> >            * Construct from a string.
>> >            */
>> >           public YourText(final String string) {
>> >               super(string);
>> >           }
>> >       }
>> >     public static class MyText extends Text
>> >     {
>> >         public MyText() {
>> >         }
>> >         /**
>> >          * Construct from a string.
>> >          */
>> >         public MyText(final String string) {
>> >             super(string);
>> >         }
>> >
>> >     }
>> >     public static class TokenizerMapper
>> >             extends Mapper<Object, Text, MyText, IntWritable> {
>> >         private final static IntWritable one = new IntWritable(1);
>> >         private MyText word = new MyText();
>> >         public void map(Object key, Text value, Context context
>> >         ) throws IOException, InterruptedException {
>> >             StringTokenizer itr = new StringTokenizer(value.toString());
>> >             while (itr.hasMoreTokens()) {
>> >                 String s = itr.nextToken().toUpperCase();
>> >                 s = dropNonLetters(s);
>> >                 if (s.length() > 0) {
>> >                     word.set(s);
>> >                     context.write(word, one);
>> >                 }
>> >             }
>> >         }
>> >     }
>> >     public static String dropNonLetters(String s) {
>> >         StringBuilder sb = new StringBuilder();
>> >         for (int i = 0; i < s.length(); i++) {
>> >             char c = s.charAt(i);
>> >             if (Character.isLetter(c))
>> >                 sb.append(c);
>> >         }
>> >         return sb.toString();
>> >     }
>> >     public static class IntSumReducer
>> >             extends Reducer<MyText, IntWritable, YourText, IntWritable>
>> {
>> >         private IntWritable result = new IntWritable();
>> >         public void reduce(MyText key, Iterable<IntWritable> values,
>> >                            Context context
>> >         ) throws IOException, InterruptedException {
>> >             int sum = 0;
>> >             for (IntWritable val : values) {
>> >                 sum += val.get();
>> >             }
>> >             result.set(sum);
>> >             context.write(new YourText(key.toString()), result);
>> >         }
>> >     }
>> >     public static class MyPartitioner extends Partitioner<Text,
>> IntWritable>
>> > {
>> >         /**
>> >          * Get the partition number for a given key (hence record) given
>> the
>> > total
>> >          * number of partitions i.e. number of reduce-tasks for the job.
>> >          * <p/>
>> >          * <p>Typically a hash function on a all or a subset of the
>> key.</p>
>> >          *
>> >          * @param key           the key to be partioned.
>> >          * @param value         the entry value.
>> >          * @param numPartitions the total number of partitions.
>> >          * @return the partition number for the <code>key</code>.
>> >          */
>> >         @Override
>> >         public int getPartition(Text key, IntWritable value, int
>> > numPartitions) {
>> >             String s = key.toString();
>> >             if (s.length() == 0)
>> >                 return 0;
>> >             char c = s.charAt(0);
>> >             int letter = Character.toUpperCase(c) - 'A';
>> >             if (letter < 0 || letter > 26)
>> >                 return 0;
>> >             return letter % numPartitions;
>> >         }
>> >     }
>> >
>> >     /**
>> >      * Force loading of needed classes to make
>> >      */
>> >     public static final Class[] NEEDED =
>> >             {
>> >                     org.apache.commons.logging.LogFactory.class,
>> >                     org.apache.commons.cli.ParseException.class
>> >             };
>> >
>> >     public static final int DEFAULT_REDUCE_TASKS = 14;
>> >     public static void main(String[] args) throws Exception {
>> >         Configuration conf = new Configuration();
>> >         String[] otherArgs = new GenericOptionsParser(conf,
>> > args).getRemainingArgs();
>> > //        if (otherArgs.length != 2) {
>> > //            System.err.println("Usage: wordcount <in> <out>");
>> > //            System.exit(2);
>> > //        }
>> >         Job job = new Job(conf, "word count");
>> >         job.setJarByClass(CapitalWordCount.class);
>> >         job.setMapperClass(TokenizerMapper.class);
>> >         job.setCombinerClass(IntSumReducer.class);
>> >         job.setReducerClass(IntSumReducer.class);
>> >
>> >         job.setMapOutputKeyClass(MyText.class);
>> >         job.setMapOutputValueClass(IntWritable.class);
>> >         job.setOutputKeyClass(YourText.class);
>> >         job.setOutputValueClass(IntWritable.class);
>> >
>> >         // added Slewis
>> >         job.setNumReduceTasks(DEFAULT_REDUCE_TASKS);
>> >         job.setPartitionerClass(MyPartitioner.class);
>> >         if(otherArgs.length > 1)    {
>> >             FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
>> >         }
>> >         String athString = otherArgs[otherArgs.length - 1];
>> >         File out = new File(athString);
>> >         if (out.exists()) {
>> >             FileUtilities.expungeDirectory(out);
>> >             out.delete();
>> >         }
>> >         Path outputDir = new Path(athString);
>> >
>> >         FileOutputFormat.setOutputPath(job, outputDir);
>> >
>> >         boolean ans = job.waitForCompletion(true);
>> >         int ret = ans ? 0 : 1;
>> >         System.exit(ret);
>> >     }
>> > }
>> > --
>> > Steven M. Lewis PhD
>> > Institute for Systems Biology
>> > Seattle WA
>> >
>>
>>
>>
>> --
>> Eric Sammer
>> twitter: esammer
>> data: www.cloudera.com
>>
>
>
>
> --
> Steven M. Lewis PhD
> Institute for Systems Biology
> Seattle WA
>

Reply via email to