[ 
https://issues.apache.org/jira/browse/MAPREDUCE-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arsen Zahray resolved MAPREDUCE-3106.
-------------------------------------

    Resolution: Won't Fix
    
> Replacing Mapper with MultithreadedMapper causes the job to crash with "Type 
> mismatch in key from map" 
> -------------------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-3106
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: 0.20.203.0
>            Reporter: Arsen Zahray
>            Priority: Blocker
>
> I have a hadoop job, which works perfectly fine when done with a class 
> implementing Mapper. When I do replace Mapper with MultithreadMapper, the job 
> crashes with following message:
> java.io.IOException: Type mismatch in key from map: expected 
> org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
>       at 
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
>       at 
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
>       at 
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>       at 
> org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
>       at 
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>       at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
>       at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>       at 
> org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)
> Here are the relevant source codes:
> public class MapReduceMain {
>       /**
>        * @param args
>        */
>       public static void main(String[] args) {
>               try {
>                       if (args.length != 2) {
>                               System.err.println("Usage: MapReduceMain <input 
> path> <output path>");
>                               System.exit(123);
>                       }
>                       Job job = new Job();
>                       job.setJarByClass(MapReduceMain.class);
>                       job.setInputFormatClass(TextInputFormat.class);
>                       FileSystem fs = FileSystem.get(URI.create(args[0]), 
> job.getConfiguration());
>                       FileStatus[] files = fs.listStatus(new Path(args[0]));
>                       for(FileStatus sfs:files){
>                               FileInputFormat.addInputPath(job, 
> sfs.getPath());
>                       }
>                       FileOutputFormat.setOutputPath(job, new Path(args[1]));
>                       
>                       job.setMapperClass(MyMultithreadMapper.class);
>                       job.setReducerClass(MyReducer.class);
>                       MultithreadedMapper.setNumberOfThreads(job, 
> MyMultithreadMapper.nThreads);
>                       job.setOutputKeyClass(IntWritable.class); 
>                       job.setOutputValueClass(MyPage.class);
>                       
>                       
> job.setOutputFormatClass(SequenceFileOutputFormat.class);
>                       
>                       System.exit(job.waitForCompletion(true) ? 0 : 1);
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
> public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, 
> Text, IntWritable, MyPage> {
>       ConcurrentLinkedQueue<MyScraper>        scrapers        = new 
> ConcurrentLinkedQueue<MyScraper>();
>       public static final int                         nThreads        = 5;
>       public VrboMultithreadMapper() {
>               for (int i = 0; i < nThreads; i++) {
>                       scrapers.add(new MyScraper());
>               }
>       }
>       public void map(LongWritable key, Text value, Context context) throws 
> IOException, InterruptedException {
>               MyScraper scraper = scrapers.poll();
>               MyPage result = null;
>               for (int i = 0; i < 10; i++) {
>                       try {
>                               result = scraper.scrapPage(value.toString(), 
> true);
>                               break;
>                       } catch (Exception e) {
>                               e.printStackTrace();
>                       }
>               }
>               if (result == null) {
>                       result = new MyPage();
>                       result.setUrl(key.toString());
>               }
>               context.write(new IntWritable(result.getUrl().hashCode()), 
> result);
>               scrapers.add(scraper);
>       }
> }
> and here's the code for the working mapper class, just to be sure:
> public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> {
>       MyScraper scr = new MyScraper();
>       
>       public void map(LongWritable key, Text value, Context context) throws 
> IOException, InterruptedException {
>               MyPage result =null;
>               for(int i=0;i<10;i++){
>                       try{
>                               result = scr.scrapPage(value.toString(), true);
>                               break;
>                       }catch(Exception e){
>                               e.printStackTrace();
>                       }
>               }
>               
>               if(result==null){
>                       result = new MyPage();
>                       result.setUrl(key.toString());
>               }
>               
>               context.write(new 
> IntWritable(result.getUrl().hashCode()),result);
>       }
> }
> This appears to be a hadoop bug

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to