I changed my Mapper based on the following example:
org.apache.avro.mapred.TestGenericJob
and change my mapper and driver to the following:
public class Tracking3Mapper2 extends MapReduceBase implements 
Mapper<LongWritable, Text, AvroWrapper<Pair<CharSequence, TrackingActivities>>, 
NullWritable>        public void map(LongWritable key, Text value, 
OutputCollector<AvroWrapper<Pair<CharSequence, TrackingActivities>>, 
NullWritable> output, Reporter reporter) throws IOException {              
........              // trackingActivities is the Avro record object,          
     output.collect(new AvroWrapper<Pair<CharSequence, TrackingActivities>>     
                   (new Pair<CharSequence, TrackingActivities>("some string 
here", trackingActivities)),                        NullWritable.get());        
}
 public class TestDriver {       // the logic to run the driver        
NLineInputFormat.addInputPath(conf, inPath);        
FileOutputFormat.setOutputPath(conf, outPath);        
conf.setInputFormat(NLineInputFormat.class);
        conf.setMapperClass(Tracking3Mapper2.class);        
AvroJob.setOutputSchema(conf, 
Pair.getPairSchema(Schema.create(Schema.Type.STRING), 
TrackingActivities.SCHEMA$));        
conf.setOutputFormat(AvroOutputFormat.class);}
You can see, it is almost exactly same as the unit test case as in 
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java?view=markupfrom
 line 108 and 152.
To my surprise, when I run my test driver class, it gave me this error:
java.io.IOException: Type mismatch in key from map: expected 
org.apache.avro.mapred.AvroKey, recieved org.apache.avro.mapred.AvroWrapper     
   at 
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1014)  at 
org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:592)   
     at 
com.constantcontact.bigdata.tracking3.mapred.Tracking3Mapper2.map(Tracking3Mapper2.java:187)
 at 
com.constantcontact.bigdata.tracking3.mapred.Tracking3Mapper2.map(Tracking3Mapper2.java:34)
  at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)    at 
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)      at 
org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)       at 
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
>From the error message, I checked the logic, and it looks like that I should 
>get this error, the reason is that MapperOutputKeyClass will be set to AvroKey 
>class, but in runtime, the mapper will omit an AvroWrapper as the key. So the 
>error makes sense.
Now the question is how the unit test works originally? From the unit test, it 
looks like that it works because the OutputCollector is using 
MapTask$DirectOutputCollector, but in my case, class MapTask$MapOutputBuffer is 
being used.
But I checked again and again of my driver and my mapper, i didn't see any 
reason will cause this difference.
Can anyone give me a hint what did wrong in my case? Am I doing the right way 
to use NLineInputFormat with AvroMapper following the example of 
TestGenericJob? If so, why I got this error?
Thanks

From: [email protected]
To: [email protected]
Subject: Use NLineInputFormat in AvroMapper
Date: Tue, 8 Oct 2013 16:15:11 -0400




Hi, I have the following questions related to how to use NLineInputFormat with 
AvroMapper. I am new to use Avro, so please help me if you think what I am 
doing is not correct.
I have this project, need to pass the data to the MR job, that ideally each 
mapper will consume one line from the text file, this line of text will be the 
location of another resources), then load the data from this resource in each 
mapper. The mapper output will be an AvroRecord object that I already write the 
Schema file, and generated the Record object.
So far, my mapper works fine, if I am using the AvroUtf8InputFormat.
Here is my driver class, most logic list here:
        JobConf conf = new JobConf(getConf(), getClass());        
conf.setJobName("Test job");        FileInputFormat.addInputPath(conf, ....);   
     FileOutputFormat.setOutputPath(conf, .....);        
conf.setInputFormat(AvroUtf8InputFormat.class);        
AvroJob.setMapperClass(conf, Tracking3Mapper.class);        
AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING));        
AvroJob.setMapOutputSchema(conf, 
Pair.getPairSchema(Schema.create(Schema.Type.STRING), 
TrackingActivities.SCHEMA$));        AvroJob.setOutputSchema(conf, 
Pair.getPairSchema(Schema.create(Schema.Type.STRING), 
TrackingActivities.SCHEMA$));        JobClient.runJob(conf);
And my mapper like following:public class Tracking3Mapper extends 
AvroMapper<Utf8, Pair<CharSequence, TrackingActivities>> {    public void 
map(Utf8 value, AvroCollector<Pair<CharSequence, TrackingActivities>> output, 
Reporter reporter) throws IOException {    }}
Everything works as I expected, but here comes my question.I want to use 
NLineInputFormat, as I want to make sure that each line in my data file will go 
to one mapper, which means one mapper will consume one line of text. I tested 
with the hadoop NLineFormat without using Avro, which works perfectly for my 
use case, as default, each line of data only will go to one mapper. So I want 
to use it with Avro.
It looks like I have 2 options, which I don't know which one works for Avro.
Option 1, change my code this way in the Driver:
NLineInputFormat.addInputPath(conf, 
.....);conf.setInputFormat(AvroUtf8InputFormat.class);
Will AvroUtf8InputFormat wrap around the NLineFormat class correctly this way?
Option 2, which is the part I need help.
NLineFormat.addInputPath(conf, 
....);conf.setInputFormat(NLineInputFormat.class);
If I do above, I will get the following exception:java.lang.ClassCastException: 
org.apache.hadoop.io.LongWritable cannot be cast to 
org.apache.avro.mapred.AvroWrapper  at 
org.apache.avro.mapred.HadoopMapper.map(HadoopMapper.java:34)        at 
org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)    at 
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)      at 
org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
So my questions are:
1) To use NLineFormat in my MR job, will option 1 work?2) If I have to set 
NLineInputFormat in conf.setInputFormat(), how I can make it work in my current 
Mapper and Driver? Does that mean my mapper shouldn't extend from AvroMapper 
any more? Can anyone give an example online location, or an example in the Avro 
Test cases in source code?
Thanks
Yong                                                                            
  

Reply via email to