Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key from map" -------------------------------------------------------------------------------------------------------
Key: MAPREDUCE-3106 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106 Project: Hadoop Map/Reduce Issue Type: Bug Affects Versions: 0.20.203.0 Reporter: Arsen Zahray Priority: Blocker I have a hadoop job, which works perfectly fine when done with a class implementing Mapper. When I do replace Mapper with MultithreadMapper, the job crashes with following message: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264) Here are the relevant source codes: public class MapReduceMain { /** * @param args */ public static void main(String[] args) { try { if (args.length != 2) { System.err.println("Usage: MapReduceMain <input path> <output path>"); System.exit(123); } Job job = new Job(); job.setJarByClass(MapReduceMain.class); job.setInputFormatClass(TextInputFormat.class); FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration()); FileStatus[] files = fs.listStatus(new Path(args[0])); for(FileStatus sfs:files){ FileInputFormat.addInputPath(job, sfs.getPath()); } FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MyMultithreadMapper.class); job.setReducerClass(MyReducer.class); MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(MyPage.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> { ConcurrentLinkedQueue<MyScraper> scrapers = new ConcurrentLinkedQueue<MyScraper>(); public static final int nThreads = 5; public VrboMultithreadMapper() { for (int i = 0; i < nThreads; i++) { scrapers.add(new MyScraper()); } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { MyScraper scraper = scrapers.poll(); MyPage result = null; for (int i = 0; i < 10; i++) { try { result = scraper.scrapPage(value.toString(), true); break; } catch (Exception e) { e.printStackTrace(); } } if (result == null) { result = new MyPage(); result.setUrl(key.toString()); } context.write(new IntWritable(result.getUrl().hashCode()), result); scrapers.add(scraper); } } and here's the code for the working mapper class, just to be sure: public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> { MyScraper scr = new MyScraper(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { MyPage result =null; for(int i=0;i<10;i++){ try{ result = scr.scrapPage(value.toString(), true); break; }catch(Exception e){ e.printStackTrace(); } } if(result==null){ result = new MyPage(); result.setUrl(key.toString()); } context.write(new IntWritable(result.getUrl().hashCode()),result); } } This appears to be a hadoop bug -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira