Going crazy here trying to reconcile this. Found links to some aspects ,
partially implemented in the 'The Definitive Guide' AVRO weather M/R
example, and outlined in Package org.apache.avro.mapred under "For jobs
whose input is an Avro data file and which use an AvroMapper, but whose
reducer is a non-Avro Reducer and whose output is a non-Avro format:".
Clearly I have misunderstood something while attempting to follow those
instructions.
The test code does not include a mapper so the job setup is not like
what I'm trying to achieve: AVRO format into Mapper, Text out of
Reducer. (I've eliminated attempting to use the Partitioner, Comparator,
and GroupingComparator used in the working M/R code that reads .tsv
rather than AVRO.)
The current stumbling block is "AvroFlowWritable cannot be cast to
org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my
class. I think my existing reducer would work fine if I could use it
with the AvroMapper it throws the above exception.
>From setup:
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(LongPair.class);
conf.setOutputValueClass(AvroFlowWritable.class);
NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema);
AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA);
AvroJob.setMapperClass(conf, AvroFlowMapper.class);
AvroJob.setReducerClass(conf, AvroFlowReducer.class);
Schema afwSchema =
ReflectData.get().getSchema(AvroFlowWritable.class);
Schema pairSchema =
Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema);
AvroJob.setMapOutputSchema(conf, pairSchema);
/*
* ------------------
* *** Mapper ***
* ------------------
*/
public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long,
AvroFlowWritable>> {
Long[] keepIps;
// Configure removed
@Override
public void map(K datum, AvroCollector<Pair<Long,
AvroFlowWritable>> collector, Reporter reporter)
throws IOException {
GenericRecord record = (GenericRecord) datum;
AvroFlowWritable afw = new AvroFlowWritable(record);
if (isKeeper(afw)) {
Long testKey;
if (inKeeperIpList(afw.getSrcIp())) {
testKey = new Long(afw.getDstIp());
} else {
testKey = new Long(afw.getSrcIp());
}
collector.collect(new Pair<Long,
AvroFlowWritable>(testKey, afw));
}
}
}
/*
* ------------------
* *** Reducer ***
* ------------------
*/
public static class AvroFlowReducer extends AvroReducer<Long,
AvroFlowWritable, Text> {
@Override
public void reduce(Long key, Iterable<AvroFlowWritable> values,
AvroCollector<Text> collector, Reporter reporter) throws IOException {
Iterator iter = values.iterator();
while (iter.hasNext()) {
AvroFlowWritable afw = (AvroFlowWritable) iter.next();
//
collector.collect(new Text(afw.toString()));
}
}
}
On 12/20/2012 12:32 PM, Terry Healy wrote:
> I'm just getting started using AVRO within Map/Reduce and trying to
> convert some existing non-AVRO code to use AVRO input. So far the data
> that previously was stored in tab delimited files has been converted to
> .avro successfully as checked with avro-tools.
>
> Where I'm getting hung up extending beyond my book-based examples is in
> attempting to read from AVRO (using generic records) where the mapper
> output is NOT in AVRO format. I can't seem to reconcile extending
> AvroMapper and NOT using AvroCollector.
>
> Here are snippets of code that show my non-AVRO M/R code and my
> [failing] attempts to make this change. If anyone can help me along it
> would be very much appreciated.
>
> -Terry
>
> <code>
> Pre-Avro version: (Works fine with .tsv input format)
>
> public static class HdFlowMapper extends MapReduceBase
> implements Mapper<Text, HdFlowWritable, LongPair,
> HdFlowWritable> {
>
>
> @Override
> public void map(Text key, HdFlowWritable value,
> OutputCollector<LongPair, HdFlowWritable> output,
> Reporter reporter) throws IOException {
>
> ...//
> outKey = new LongPair(value.getSrcIp(), value.getFirst());
>
> HdFlowWritable outValue = value; // pass it all through
> output.collect(outKey, outValue);
> }
>
>