I'd like to implement a MultithreadMapper for my MapReduce job.

For this I replaced Mapper with MultithreadMapper in a working code.

Here's the exeption I'm getting:

    java.io.IOException: Type mismatch in key from map: expected
org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
    at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
    at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
    at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)

Here's the code setup:

     public static void main(String[] args) {
        try {
            if (args.length != 2) {
                System.err.println("Usage: MapReduceMain <input path>
<output path>");
                System.exit(123);
            }
            Job job = new Job();
            job.setJarByClass(MapReduceMain.class);
            job.setInputFormatClass(TextInputFormat.class);
            FileSystem fs = FileSystem.get(URI.create(args[0]),
job.getConfiguration());
            FileStatus[] files = fs.listStatus(new Path(args[0]));
            for(FileStatus sfs:files){
                FileInputFormat.addInputPath(job, sfs.getPath());
            }
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            job.setMapperClass(MyMultithreadMapper.class);
            job.setReducerClass(MyReducer.class);
            MultithreadedMapper.setNumberOfThreads(job,
MyMultithreadMapper.nThreads);

            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(MyPage.class);

            job.setOutputFormatClass(SequenceFileOutputFormat.class);//write
the result as sequential file

            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

And here's the mapper's code:

    public class MyMultithreadMapper extends
MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {

    ConcurrentLinkedQueue<MyScraper>    scrapers    = new
ConcurrentLinkedQueue<MyScraper>();

    public static final int                nThreads    = 5;

    public MyMultithreadMapper() {
        for (int i = 0; i < nThreads; i++) {
            scrapers.add(new MyScraper());
        }
    }

    public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
        MyScraper scraper = scrapers.poll();

        MyPage result = null;
        for (int i = 0; i < 10; i++) {
            try {
                result = scraper.scrapPage(value.toString(), true);
                break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        if (result == null) {
            result = new MyPage();
            result.setUrl(key.toString());
        }

        context.write(new IntWritable(result.getUrl().hashCode()), result);

        scrapers.add(scraper);
    }

Why the hell am I getting this?

Reply via email to