There is no need to call job.setCombinerClass() Combiner is optional. On Sat, Jun 19, 2010 at 10:01 AM, Steve Lewis <lordjoe2...@gmail.com> wrote:
> Wow - I cannot tell you how much I thank you - I totally missed the fact > that the exception is thrown in the combiner since I was seeing the > exception in the reducer - I always thought the combiner was called between > the mapper and the reducer and not after > the reducer - > Also does this mean I should use null as a combiner or use a very > generic combiner - especially for my real problem when there is no real > combiner step > > On Fri, Jun 18, 2010 at 2:45 PM, Eric Sammer <esam...@cloudera.com> wrote: > >> This took me a full read through to figure out. The problem is that >> you're using your reducer as a combiner and when it runs, the output >> of the map stage then becomes the wrong type. >> >> In pseudo-visual-speak: >> >> Object, Int -> Map() -> MyText, Int -> Combine() -> YourText, Int -> >> EXCEPTION! >> >> When using your reducer as a combiner, the reducer outputs *must* >> match the map outputs. In other words, your combiner - which is >> *optional* in the chain at Hadoop's pleasure - is changing the key >> space. That's a no-no. In your case, you can't reuse your reducer as a >> combiner. >> >> (The hint is in the exception: it's occurring in the combiner classes >> in Hadoop.) >> >> Hope that helps. >> >> On Fri, Jun 18, 2010 at 2:09 PM, Steve Lewis <lordjoe2...@gmail.com> >> wrote: >> > >> > This class is a copy of a standard WordCount class with one critical >> > exception >> > Instead of the Mapper Emitting a Key of Type Text it emits a key of type >> > MyText - s simple subclass of Text >> > The reducer emits a different subclass of Text - YourText >> > I say >> > job.setMapOutputKeyClass(MyText.class); >> > job.setMapOutputValueClass(IntWritable.class); >> > job.setOutputKeyClass(YourText.class); >> > job.setOutputValueClass(IntWritable.class); >> > which should declare these classes directly and yet I get the following >> > exception using hadoop 0.2 on a local box >> > What am I doing wrong >> > >> > java.io.IOException: wrong key class: class >> > org.systemsbiology.hadoop.CapitalWordCount$YourText is not class >> > org.systemsbiology.hadoop.CapitalWordCount$MyText >> > at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164) >> > at >> > >> org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880) >> > at >> > >> org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201) >> > at >> > >> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) >> > at >> > >> org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89) >> > >> > package org.systemsbiology.hadoop; >> > import com.lordjoe.utilities.*; >> > import org.apache.hadoop.conf.*; >> > import org.apache.hadoop.fs.*; >> > import org.apache.hadoop.io.*; >> > import org.apache.hadoop.mapreduce.*; >> > import org.apache.hadoop.mapreduce.lib.input.*; >> > import org.apache.hadoop.mapreduce.lib.output.*; >> > import org.apache.hadoop.util.*; >> > import java.io.*; >> > import java.util.*; >> > /** >> > * org.systemsbiology.hadoop.CapitalWordCount >> > */ >> > public class CapitalWordCount { >> > public static class YourText extends Text >> > { >> > public YourText() { >> > } >> > /** >> > * Construct from a string. >> > */ >> > public YourText(final String string) { >> > super(string); >> > } >> > } >> > public static class MyText extends Text >> > { >> > public MyText() { >> > } >> > /** >> > * Construct from a string. >> > */ >> > public MyText(final String string) { >> > super(string); >> > } >> > >> > } >> > public static class TokenizerMapper >> > extends Mapper<Object, Text, MyText, IntWritable> { >> > private final static IntWritable one = new IntWritable(1); >> > private MyText word = new MyText(); >> > public void map(Object key, Text value, Context context >> > ) throws IOException, InterruptedException { >> > StringTokenizer itr = new StringTokenizer(value.toString()); >> > while (itr.hasMoreTokens()) { >> > String s = itr.nextToken().toUpperCase(); >> > s = dropNonLetters(s); >> > if (s.length() > 0) { >> > word.set(s); >> > context.write(word, one); >> > } >> > } >> > } >> > } >> > public static String dropNonLetters(String s) { >> > StringBuilder sb = new StringBuilder(); >> > for (int i = 0; i < s.length(); i++) { >> > char c = s.charAt(i); >> > if (Character.isLetter(c)) >> > sb.append(c); >> > } >> > return sb.toString(); >> > } >> > public static class IntSumReducer >> > extends Reducer<MyText, IntWritable, YourText, IntWritable> >> { >> > private IntWritable result = new IntWritable(); >> > public void reduce(MyText key, Iterable<IntWritable> values, >> > Context context >> > ) throws IOException, InterruptedException { >> > int sum = 0; >> > for (IntWritable val : values) { >> > sum += val.get(); >> > } >> > result.set(sum); >> > context.write(new YourText(key.toString()), result); >> > } >> > } >> > public static class MyPartitioner extends Partitioner<Text, >> IntWritable> >> > { >> > /** >> > * Get the partition number for a given key (hence record) given >> the >> > total >> > * number of partitions i.e. number of reduce-tasks for the job. >> > * <p/> >> > * <p>Typically a hash function on a all or a subset of the >> key.</p> >> > * >> > * @param key the key to be partioned. >> > * @param value the entry value. >> > * @param numPartitions the total number of partitions. >> > * @return the partition number for the <code>key</code>. >> > */ >> > @Override >> > public int getPartition(Text key, IntWritable value, int >> > numPartitions) { >> > String s = key.toString(); >> > if (s.length() == 0) >> > return 0; >> > char c = s.charAt(0); >> > int letter = Character.toUpperCase(c) - 'A'; >> > if (letter < 0 || letter > 26) >> > return 0; >> > return letter % numPartitions; >> > } >> > } >> > >> > /** >> > * Force loading of needed classes to make >> > */ >> > public static final Class[] NEEDED = >> > { >> > org.apache.commons.logging.LogFactory.class, >> > org.apache.commons.cli.ParseException.class >> > }; >> > >> > public static final int DEFAULT_REDUCE_TASKS = 14; >> > public static void main(String[] args) throws Exception { >> > Configuration conf = new Configuration(); >> > String[] otherArgs = new GenericOptionsParser(conf, >> > args).getRemainingArgs(); >> > // if (otherArgs.length != 2) { >> > // System.err.println("Usage: wordcount <in> <out>"); >> > // System.exit(2); >> > // } >> > Job job = new Job(conf, "word count"); >> > job.setJarByClass(CapitalWordCount.class); >> > job.setMapperClass(TokenizerMapper.class); >> > job.setCombinerClass(IntSumReducer.class); >> > job.setReducerClass(IntSumReducer.class); >> > >> > job.setMapOutputKeyClass(MyText.class); >> > job.setMapOutputValueClass(IntWritable.class); >> > job.setOutputKeyClass(YourText.class); >> > job.setOutputValueClass(IntWritable.class); >> > >> > // added Slewis >> > job.setNumReduceTasks(DEFAULT_REDUCE_TASKS); >> > job.setPartitionerClass(MyPartitioner.class); >> > if(otherArgs.length > 1) { >> > FileInputFormat.addInputPath(job, new Path(otherArgs[0])); >> > } >> > String athString = otherArgs[otherArgs.length - 1]; >> > File out = new File(athString); >> > if (out.exists()) { >> > FileUtilities.expungeDirectory(out); >> > out.delete(); >> > } >> > Path outputDir = new Path(athString); >> > >> > FileOutputFormat.setOutputPath(job, outputDir); >> > >> > boolean ans = job.waitForCompletion(true); >> > int ret = ans ? 0 : 1; >> > System.exit(ret); >> > } >> > } >> > -- >> > Steven M. Lewis PhD >> > Institute for Systems Biology >> > Seattle WA >> > >> >> >> >> -- >> Eric Sammer >> twitter: esammer >> data: www.cloudera.com >> > > > > -- > Steven M. Lewis PhD > Institute for Systems Biology > Seattle WA >