I am setting my wordcount example, which is very similar to the
Wordcount example that we find in the Internet.
1.
The MyMap class extends and implements the same classes as the ones
defined in the original wordcount example, but in my case I get the
error of “Interface expected here”. I really don’t understand why I
get this error. See my example below [1]. Any help here?
2.
Is it possible to access the JobConf variable inside the map or
reduce methods?
[1] My Wordcount example
|package org.apache.hadoop.mapred.examples; import
org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable; import
org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*; import
org.apache.hadoop.mapreduce.Job; import
org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.ReduceContext; import
org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException;
import java.util.ArrayList; import java.util.Iterator; import
java.util.List; import java.util.StringTokenizer; /** * My example of a
common wordcount. Compare with the official WordCount.class to
understand the differences between both classes. */ public class
MyWordCount { public static class MyMap extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> { <<<< Interface expected
here!!! private final static IntWritable one = new IntWritable(1);
private Text word = new Text(); public void map(LongWritable key, Text
value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException { String line = value.toString(); StringTokenizer
tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens())
{ word.set(tokenizer.nextToken()); output.collect(word, one); } } }
public static class MyReducer extends
Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result
= new IntWritable(); MedusaDigests parser = new MedusaDigests(); public
void reduce(Text key, Iterable<IntWritable> values, Context context )
throws IOException, InterruptedException { int sum = 0; for (IntWritable
val : values) { System.out.println(" - key ( " +
key.getClass().toString() + "): " + key.toString() + " value ( " +
val.getClass().toString() + " ): " + val.toString()); sum += val.get();
} result.set(sum); context.write(key, result); } public void run(Context
context) throws IOException, InterruptedException { setup(context); try
{ while (context.nextKey()) { System.out.println("Key: " +
context.getCurrentKey()); reduce(context.getCurrentKey(),
context.getValues(), context); // If a back up store is used, reset it
Iterator<IntWritable> iter = context.getValues().iterator(); if(iter
instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore(); } }
} finally { cleanup(context); } } protected void cleanup(Context
context) throws IOException, InterruptedException {
parser.cleanup(context); } } /** Identity mapper set by the user. */
public static class MyFullyIndentityMapper extends Mapper<Object, Text,
Text, IntWritable>{ private Text word = new Text(); private IntWritable
val = new IntWritable(); public void map(Object key, Text value, Context
context ) throws IOException, InterruptedException { StringTokenizer itr
= new StringTokenizer(value.toString()); word.set(itr.nextToken());
val.set(Integer.valueOf(itr.nextToken())); context.write(word, val); }
public void run(Context context) throws IOException,
InterruptedException { setup(context); try { while
(context.nextKeyValue()) { System.out.println("Key ( " +
context.getCurrentKey().getClass().getName() + " ): " +
context.getCurrentKey() + " Value (" +
context.getCurrentValue().getClass().getName() + "): " +
context.getCurrentValue()); map(context.getCurrentKey(),
context.getCurrentValue(), context); } } finally { cleanup(context); } }
} public static void main(String[] args) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(new
Configuration(), args); String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length < 2) { System.err.println("Usage: wordcount
[<in>...] <out>"); System.exit(2); } // path that contains the file with
all attributes necessary to the execution of the job Medusa execution =
new Medusa(args); // first map tasks JobConf conf = new
JobConf(MyWordCount.class); conf.setJobName("wordcount");
conf.setClass("mapreduce.job.map.identity.class",
MyFullyIndentityMapper.class, Mapper.class);
System.out.println(conf.toString());
conf.setJarByClass(MyWordCount.class); conf.setMapperClass(MyMap.class);
conf.setPartitionerClass(MyHashPartitioner.class);
conf.setReducerClass(MyReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class); conf.setNumReduceTasks(1);
List<Path> inputPaths = new ArrayList<Path>(); for (int i = 0; i <
otherArgs.length - 1; ++i) { inputPaths.add(new Path(otherArgs[i])); }
Path outputPath = new Path(otherArgs[otherArgs.length - 1]);
execution.setInputPath(inputPaths); execution.setOutputPath(outputPath);
// launch the job directly execution.submit(new Job(conf)); } } |