[
https://issues.apache.org/jira/browse/MAPREDUCE-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13116314#comment-13116314
]
Arsen Zahray commented on MAPREDUCE-3106:
-----------------------------------------
Can the code be changed so it works like I tried to use it?
I believe that many developers to come will make the same mistake I did. Making
it work by subclass would make the api more intuitive
> Replacing Mapper with MultithreadedMapper causes the job to crash with "Type
> mismatch in key from map"
> -------------------------------------------------------------------------------------------------------
>
> Key: MAPREDUCE-3106
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Affects Versions: 0.20.203.0
> Reporter: Arsen Zahray
> Priority: Blocker
>
> I have a hadoop job, which works perfectly fine when done with a class
> implementing Mapper. When I do replace Mapper with MultithreadMapper, the job
> crashes with following message:
> java.io.IOException: Type mismatch in key from map: expected
> org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
> at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
> at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> at
> org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
> at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> at
> org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)
> Here are the relevant source codes:
> public class MapReduceMain {
> /**
> * @param args
> */
> public static void main(String[] args) {
> try {
> if (args.length != 2) {
> System.err.println("Usage: MapReduceMain <input
> path> <output path>");
> System.exit(123);
> }
> Job job = new Job();
> job.setJarByClass(MapReduceMain.class);
> job.setInputFormatClass(TextInputFormat.class);
> FileSystem fs = FileSystem.get(URI.create(args[0]),
> job.getConfiguration());
> FileStatus[] files = fs.listStatus(new Path(args[0]));
> for(FileStatus sfs:files){
> FileInputFormat.addInputPath(job,
> sfs.getPath());
> }
> FileOutputFormat.setOutputPath(job, new Path(args[1]));
>
> job.setMapperClass(MyMultithreadMapper.class);
> job.setReducerClass(MyReducer.class);
> MultithreadedMapper.setNumberOfThreads(job,
> MyMultithreadMapper.nThreads);
> job.setOutputKeyClass(IntWritable.class);
> job.setOutputValueClass(MyPage.class);
>
>
> job.setOutputFormatClass(SequenceFileOutputFormat.class);
>
> System.exit(job.waitForCompletion(true) ? 0 : 1);
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> public class MyMultithreadMapper extends MultithreadedMapper<LongWritable,
> Text, IntWritable, MyPage> {
> ConcurrentLinkedQueue<MyScraper> scrapers = new
> ConcurrentLinkedQueue<MyScraper>();
> public static final int nThreads = 5;
> public VrboMultithreadMapper() {
> for (int i = 0; i < nThreads; i++) {
> scrapers.add(new MyScraper());
> }
> }
> public void map(LongWritable key, Text value, Context context) throws
> IOException, InterruptedException {
> MyScraper scraper = scrapers.poll();
> MyPage result = null;
> for (int i = 0; i < 10; i++) {
> try {
> result = scraper.scrapPage(value.toString(),
> true);
> break;
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> if (result == null) {
> result = new MyPage();
> result.setUrl(key.toString());
> }
> context.write(new IntWritable(result.getUrl().hashCode()),
> result);
> scrapers.add(scraper);
> }
> }
> and here's the code for the working mapper class, just to be sure:
> public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> {
> MyScraper scr = new MyScraper();
>
> public void map(LongWritable key, Text value, Context context) throws
> IOException, InterruptedException {
> MyPage result =null;
> for(int i=0;i<10;i++){
> try{
> result = scr.scrapPage(value.toString(), true);
> break;
> }catch(Exception e){
> e.printStackTrace();
> }
> }
>
> if(result==null){
> result = new MyPage();
> result.setUrl(key.toString());
> }
>
> context.write(new
> IntWritable(result.getUrl().hashCode()),result);
> }
> }
> This appears to be a hadoop bug
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira