[ https://issues.apache.org/jira/browse/MAPREDUCE-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arsen Zahray resolved MAPREDUCE-3106. ------------------------------------- Resolution: Won't Fix > Replacing Mapper with MultithreadedMapper causes the job to crash with "Type > mismatch in key from map" > ------------------------------------------------------------------------------------------------------- > > Key: MAPREDUCE-3106 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106 > Project: Hadoop Map/Reduce > Issue Type: Bug > Affects Versions: 0.20.203.0 > Reporter: Arsen Zahray > Priority: Blocker > > I have a hadoop job, which works perfectly fine when done with a class > implementing Mapper. When I do replace Mapper with MultithreadMapper, the job > crashes with following message: > java.io.IOException: Type mismatch in key from map: expected > org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable > at > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862) > at > org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549) > at > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > at > org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211) > at > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) > at > org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264) > Here are the relevant source codes: > public class MapReduceMain { > /** > * @param args > */ > public static void main(String[] args) { > try { > if (args.length != 2) { > System.err.println("Usage: MapReduceMain <input > path> <output path>"); > System.exit(123); > } > Job job = new Job(); > job.setJarByClass(MapReduceMain.class); > job.setInputFormatClass(TextInputFormat.class); > FileSystem fs = FileSystem.get(URI.create(args[0]), > job.getConfiguration()); > FileStatus[] files = fs.listStatus(new Path(args[0])); > for(FileStatus sfs:files){ > FileInputFormat.addInputPath(job, > sfs.getPath()); > } > FileOutputFormat.setOutputPath(job, new Path(args[1])); > > job.setMapperClass(MyMultithreadMapper.class); > job.setReducerClass(MyReducer.class); > MultithreadedMapper.setNumberOfThreads(job, > MyMultithreadMapper.nThreads); > job.setOutputKeyClass(IntWritable.class); > job.setOutputValueClass(MyPage.class); > > > job.setOutputFormatClass(SequenceFileOutputFormat.class); > > System.exit(job.waitForCompletion(true) ? 0 : 1); > } catch (Exception e) { > e.printStackTrace(); > } > } > public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, > Text, IntWritable, MyPage> { > ConcurrentLinkedQueue<MyScraper> scrapers = new > ConcurrentLinkedQueue<MyScraper>(); > public static final int nThreads = 5; > public VrboMultithreadMapper() { > for (int i = 0; i < nThreads; i++) { > scrapers.add(new MyScraper()); > } > } > public void map(LongWritable key, Text value, Context context) throws > IOException, InterruptedException { > MyScraper scraper = scrapers.poll(); > MyPage result = null; > for (int i = 0; i < 10; i++) { > try { > result = scraper.scrapPage(value.toString(), > true); > break; > } catch (Exception e) { > e.printStackTrace(); > } > } > if (result == null) { > result = new MyPage(); > result.setUrl(key.toString()); > } > context.write(new IntWritable(result.getUrl().hashCode()), > result); > scrapers.add(scraper); > } > } > and here's the code for the working mapper class, just to be sure: > public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> { > MyScraper scr = new MyScraper(); > > public void map(LongWritable key, Text value, Context context) throws > IOException, InterruptedException { > MyPage result =null; > for(int i=0;i<10;i++){ > try{ > result = scr.scrapPage(value.toString(), true); > break; > }catch(Exception e){ > e.printStackTrace(); > } > } > > if(result==null){ > result = new MyPage(); > result.setUrl(key.toString()); > } > > context.write(new > IntWritable(result.getUrl().hashCode()),result); > } > } > This appears to be a hadoop bug -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira