Hi,
Currently I have a MR job needs to use my own Key class to support 2nd sort in
the MR job.
The originally job is using Avro String type as the mapper output like this
format:
public class MyMapper extends MapReduceBase implements Mapper<LongWritable,
Text, AvroKey<CharSequence>, AvroValue<OneAvroSpecificRecordObject>>
Right now, I need to change the key from Text to a custom Key object, as I need
to control complex sorting order and support 2nd sort in my MR job.
So I create a CustomKeyObject (PartitionKey class), which contains 3 Long
values and 4 String values. This key class implements WritableComparable and I
also have my KeyComparator and KeyGroupComparator class implementation ready.
So in this case, I want to change my mapper for the new format:
public class MyMapper extends MapReduceBase implements Mapper<LongWritable,
Text, AvroKey< CustomKeyObject >, AvroValue<OneAvroSpecificRecordObject>>
Here comes the problem, I don't know what kind of schema I can use in my driver
class for this key.
Originally, the driver will have following line:
AvroJob.setMapOutputSchema(conf,
Pair.getPairSchema(Schema.create(Schema.Type.STRING),
OneAvroSpecificRecordObject.SCHEMA$));
So my question is what kind of schema I should use above to replay the
TYPE.STRING?
Here are some things I tried, and the error I got:
1) I tried with a Union Schema, with 3 Long Types and 4 String Types. It does
NOT work, as union cannot contain duplicate types.2) Then I think I need to
create an anonymous record schema, it should work for my case. So here is what
I do: First, in the code, add the schema definition: String keySchema =
"type........." // create a record schema with 3 long types and 4 string types
Then, generate the schema at runtime in my code:
AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(new
Schema.Parser().parse(keySchema), OneAvroSpecificRecordObject.SCHEMA$)); This
works fine for all my mapper stage, but in the reducer part, it failed with the
following error: java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to PartitionKeyMy
reducer likes this:myReducer implements Reducer<AvroKey< PartitionKey >,
AvroValue< OneAvroSpecificRecordObject >, NullWritable, NullWritable>It looks
like if I use anonymous record schema, it will use genericData$Record, which I
cannot cast to PartitionKey class I want.3) Then I think, do I have to generate
a specific PartitionKey object using a new avsc file? I can do that, but the
new object generated by Avro won't implements WritableComparable, so I cannot
use it as key of mapper.
I wonder, if I want to use a custom key implements WritableComparable as my
mapper output key, what schema I should use in Avro? I searched the source code
of Avro, and didn't find any existing examples to demo this. Also on the web,
not too many examples to talk about it. But for a lot of cases, we want our own
custom Key Class implementation, to be used in MR job. Does anyone know how to
do the schema for this kind of class? Any examples available?
Thanks
Yong