Replacing Mapper with MultithreadedMapper causes the job to crash with "Type
mismatch in key from map"
-------------------------------------------------------------------------------------------------------
Key: MAPREDUCE-3106
URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106
Project: Hadoop Map/Reduce
Issue Type: Bug
Affects Versions: 0.20.203.0
Reporter: Arsen Zahray
Priority: Blocker
I have a hadoop job, which works perfectly fine when done with a class
implementing Mapper. When I do replace Mapper with MultithreadMapper, the job
crashes with following message:
java.io.IOException: Type mismatch in key from map: expected
org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)
Here are the relevant source codes:
public class MapReduceMain {
/**
* @param args
*/
public static void main(String[] args) {
try {
if (args.length != 2) {
System.err.println("Usage: MapReduceMain <input
path> <output path>");
System.exit(123);
}
Job job = new Job();
job.setJarByClass(MapReduceMain.class);
job.setInputFormatClass(TextInputFormat.class);
FileSystem fs = FileSystem.get(URI.create(args[0]),
job.getConfiguration());
FileStatus[] files = fs.listStatus(new Path(args[0]));
for(FileStatus sfs:files){
FileInputFormat.addInputPath(job,
sfs.getPath());
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MyMultithreadMapper.class);
job.setReducerClass(MyReducer.class);
MultithreadedMapper.setNumberOfThreads(job,
MyMultithreadMapper.nThreads);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(MyPage.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public class MyMultithreadMapper extends MultithreadedMapper<LongWritable,
Text, IntWritable, MyPage> {
ConcurrentLinkedQueue<MyScraper> scrapers = new
ConcurrentLinkedQueue<MyScraper>();
public static final int nThreads = 5;
public VrboMultithreadMapper() {
for (int i = 0; i < nThreads; i++) {
scrapers.add(new MyScraper());
}
}
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
MyScraper scraper = scrapers.poll();
MyPage result = null;
for (int i = 0; i < 10; i++) {
try {
result = scraper.scrapPage(value.toString(),
true);
break;
} catch (Exception e) {
e.printStackTrace();
}
}
if (result == null) {
result = new MyPage();
result.setUrl(key.toString());
}
context.write(new IntWritable(result.getUrl().hashCode()),
result);
scrapers.add(scraper);
}
}
and here's the code for the working mapper class, just to be sure:
public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> {
MyScraper scr = new MyScraper();
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
MyPage result =null;
for(int i=0;i<10;i++){
try{
result = scr.scrapPage(value.toString(), true);
break;
}catch(Exception e){
e.printStackTrace();
}
}
if(result==null){
result = new MyPage();
result.setUrl(key.toString());
}
context.write(new
IntWritable(result.getUrl().hashCode()),result);
}
}
This appears to be a hadoop bug
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira