Hi,
Currently I have a MR job needs to use my own Key class to support 2nd sort in 
the MR job.
The originally job is using Avro String type as the mapper output like this 
format:
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, 
Text, AvroKey<CharSequence>,        AvroValue<OneAvroSpecificRecordObject>>
Right now, I need to change the key from Text to a custom Key object, as I need 
to control complex sorting order and support 2nd sort in my MR job.
So I create a CustomKeyObject (PartitionKey class), which contains 3 Long 
values and 4 String values. This key class implements WritableComparable and I 
also have my KeyComparator and KeyGroupComparator class implementation ready.
So in this case, I want to change my mapper for the new format:
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, 
Text, AvroKey< CustomKeyObject >,        AvroValue<OneAvroSpecificRecordObject>>
Here comes the problem, I don't know what kind of schema I can use in my driver 
class for this key.
Originally, the driver will have following line:
AvroJob.setMapOutputSchema(conf, 
Pair.getPairSchema(Schema.create(Schema.Type.STRING), 
OneAvroSpecificRecordObject.SCHEMA$));
So my question is what kind of schema I should use above to replay the 
TYPE.STRING?
Here are some things I tried, and the error I got:
1) I tried with a Union Schema, with 3 Long Types and 4 String Types. It does 
NOT work, as union cannot contain duplicate types.2) Then I think I need to 
create an anonymous record schema, it should work for my case. So here is what 
I do:    First, in the code, add the schema definition:    String keySchema = 
"type........." // create a record schema with 3 long types and 4 string types  
  Then, generate the schema at runtime in my code:   
AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(new 
Schema.Parser().parse(keySchema), OneAvroSpecificRecordObject.SCHEMA$));   This 
works fine for all my mapper stage, but in the reducer part, it failed with the 
following error:   java.lang.ClassCastException: 
org.apache.avro.generic.GenericData$Record cannot be cast to PartitionKeyMy 
reducer likes this:myReducer implements Reducer<AvroKey< PartitionKey >, 
AvroValue< OneAvroSpecificRecordObject >, NullWritable, NullWritable>It looks 
like if I use anonymous record schema, it will use genericData$Record, which I 
cannot cast to PartitionKey class I want.3) Then I think, do I have to generate 
a specific PartitionKey object using a new avsc file? I can do that, but the 
new object generated by Avro won't implements WritableComparable, so I cannot 
use it as key of mapper.
I wonder, if I want to use a custom key implements WritableComparable as my 
mapper output key, what schema I should use in Avro? I searched the source code 
of Avro, and didn't find any existing examples to demo this. Also on the web, 
not too many examples to talk about it. But for a lot of cases, we want our own 
custom Key Class implementation, to be used in MR job. Does anyone know how to 
do the schema for this kind of class? Any examples available?
Thanks
Yong                                      

Reply via email to