Replacing Mapper with MultithreadedMapper causes the job to crash with "Type 
mismatch in key from map" 
-------------------------------------------------------------------------------------------------------

                 Key: MAPREDUCE-3106
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106
             Project: Hadoop Map/Reduce
          Issue Type: Bug
    Affects Versions: 0.20.203.0
            Reporter: Arsen Zahray
            Priority: Blocker


I have a hadoop job, which works perfectly fine when done with a class 
implementing Mapper. When I do replace Mapper with MultithreadMapper, the job 
crashes with following message:

java.io.IOException: Type mismatch in key from map: expected 
org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
        at 
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
        at 
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
        at 
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at 
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
        at 
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at 
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)

Here are the relevant source codes:

public class MapReduceMain {

        /**
         * @param args
         */
        public static void main(String[] args) {
                try {
                        if (args.length != 2) {
                                System.err.println("Usage: MapReduceMain <input 
path> <output path>");
                                System.exit(123);
                        }
                        Job job = new Job();
                        job.setJarByClass(MapReduceMain.class);
                        job.setInputFormatClass(TextInputFormat.class);
                        FileSystem fs = FileSystem.get(URI.create(args[0]), 
job.getConfiguration());
                        FileStatus[] files = fs.listStatus(new Path(args[0]));
                        for(FileStatus sfs:files){
                                FileInputFormat.addInputPath(job, 
sfs.getPath());
                        }
                        FileOutputFormat.setOutputPath(job, new Path(args[1]));
                        
                        job.setMapperClass(MyMultithreadMapper.class);
                        job.setReducerClass(MyReducer.class);
                        MultithreadedMapper.setNumberOfThreads(job, 
MyMultithreadMapper.nThreads);

                        job.setOutputKeyClass(IntWritable.class); 
                        job.setOutputValueClass(MyPage.class);
                        
                        
job.setOutputFormatClass(SequenceFileOutputFormat.class);
                        
                        System.exit(job.waitForCompletion(true) ? 0 : 1);
                } catch (Exception e) {
                        e.printStackTrace();
                }

        }


public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, 
Text, IntWritable, MyPage> {

        ConcurrentLinkedQueue<MyScraper>        scrapers        = new 
ConcurrentLinkedQueue<MyScraper>();

        public static final int                         nThreads        = 5;

        public VrboMultithreadMapper() {
                for (int i = 0; i < nThreads; i++) {
                        scrapers.add(new MyScraper());
                }
        }

        public void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
                MyScraper scraper = scrapers.poll();

                MyPage result = null;
                for (int i = 0; i < 10; i++) {
                        try {
                                result = scraper.scrapPage(value.toString(), 
true);
                                break;
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                }

                if (result == null) {
                        result = new MyPage();
                        result.setUrl(key.toString());
                }

                context.write(new IntWritable(result.getUrl().hashCode()), 
result);

                scrapers.add(scraper);
        }
}


and here's the code for the working mapper class, just to be sure:

public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> {
        MyScraper scr = new MyScraper();
        
        public void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
                MyPage result =null;
                for(int i=0;i<10;i++){
                        try{
                                result = scr.scrapPage(value.toString(), true);
                                break;
                        }catch(Exception e){
                                e.printStackTrace();
                        }
                }
                
                if(result==null){
                        result = new MyPage();
                        result.setUrl(key.toString());
                }
                
                context.write(new 
IntWritable(result.getUrl().hashCode()),result);
        }
}


This appears to be a hadoop bug

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to