Going crazy here trying to reconcile this. Found links to some aspects ,
partially implemented in the 'The Definitive Guide' AVRO weather M/R
example, and outlined in Package org.apache.avro.mapred under "For jobs
whose input is an Avro data file and which use an AvroMapper, but whose
reducer is a non-Avro Reducer and whose output is a non-Avro format:".
Clearly I have misunderstood something while attempting to follow those
instructions.

The test code does not include a mapper so the job setup is not like
what I'm trying to achieve: AVRO format into Mapper, Text out of
Reducer. (I've eliminated attempting to use the Partitioner, Comparator,
and GroupingComparator used in the working M/R code that reads .tsv
rather than AVRO.)

The current stumbling block is "AvroFlowWritable cannot be cast to
org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my
class. I think my existing reducer would work fine if I could use it
with the AvroMapper it throws the above exception.


>From setup:

        conf.setOutputFormat(TextOutputFormat.class);
        conf.setOutputKeyClass(LongPair.class);
        conf.setOutputValueClass(AvroFlowWritable.class);

        NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema);

        AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA);
        AvroJob.setMapperClass(conf, AvroFlowMapper.class);
        AvroJob.setReducerClass(conf, AvroFlowReducer.class);

        Schema afwSchema =
ReflectData.get().getSchema(AvroFlowWritable.class);
        Schema pairSchema =
Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema);
        AvroJob.setMapOutputSchema(conf, pairSchema);


    /*
     * ------------------
     * *** Mapper ***
     * ------------------
     */

public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long,
AvroFlowWritable>> {

        Long[] keepIps;

        // Configure removed

        @Override
        public void map(K datum, AvroCollector<Pair<Long,
                AvroFlowWritable>> collector, Reporter reporter)
                throws IOException {


            GenericRecord record = (GenericRecord) datum;
            AvroFlowWritable afw = new AvroFlowWritable(record);

            if (isKeeper(afw)) {

                 Long testKey;
                if (inKeeperIpList(afw.getSrcIp())) {

                    testKey = new Long(afw.getDstIp());

                } else {

                    testKey = new Long(afw.getSrcIp());
                }
                collector.collect(new Pair<Long,
AvroFlowWritable>(testKey, afw));
            }
        }
    }

 /*
     * ------------------
     * *** Reducer ***
     * ------------------
     */

    public static class AvroFlowReducer extends AvroReducer<Long,
AvroFlowWritable, Text> {


        @Override
        public void reduce(Long key, Iterable<AvroFlowWritable> values,
AvroCollector<Text> collector, Reporter reporter) throws IOException {

            Iterator iter = values.iterator();
            while (iter.hasNext()) {

                AvroFlowWritable afw = (AvroFlowWritable) iter.next();
                //
                collector.collect(new Text(afw.toString()));
            }
        }
   }



On 12/20/2012 12:32 PM, Terry Healy wrote:
> I'm just getting started using AVRO within Map/Reduce and trying to
> convert some existing non-AVRO code to use AVRO input. So far the data
> that previously was stored in tab delimited files has been converted to
> .avro successfully as checked with avro-tools.
> 
> Where I'm getting hung up extending beyond my book-based examples is in
> attempting to read from AVRO (using generic records) where the mapper
> output is NOT in AVRO format. I can't seem to reconcile extending
> AvroMapper and NOT using AvroCollector.
> 
> Here are snippets of code that show my non-AVRO M/R code and my
> [failing] attempts to make this change. If anyone can help me along it
> would be very much appreciated.
> 
> -Terry
> 
> <code>
> Pre-Avro version: (Works fine with .tsv input format)
> 
>     public static class HdFlowMapper extends MapReduceBase
>             implements Mapper<Text, HdFlowWritable, LongPair,
> HdFlowWritable> {
> 
> 
>         @Override
>         public void map(Text key, HdFlowWritable value,
>                 OutputCollector<LongPair, HdFlowWritable> output,
>                 Reporter reporter) throws IOException {
> 
>               ...//
>                 outKey = new LongPair(value.getSrcIp(), value.getFirst());
> 
>                 HdFlowWritable outValue = value; // pass it all through
>                 output.collect(outKey, outValue);
>       }
> 
> 

Reply via email to