Hi All,

We have Avro data files in HDFS which are compressed using the Deflate codec.
We have written an M/R job using the Avro Mapred API to combine those files.

It seems to be working fine, however when we run it we notice that the 
temporary work area (spills, etc) seem to be uncompressed.
We're thinking we might see a speedup due to reduced I/O if the temporary files 
are compressed as well.

Is there a way to enable "mapred.compress.map.output" in such a way that those 
temporary files are compressed as Avro/Deflate?
I tried simply setting conf.setBoolean("mapred.compress.map.output", true); but 
it didn't seem to have any effect.

Note that in order to avoid unnecessary sorting overhead, I made each key a 
constant (1L) so that the logs are combined but ordering isn't necessarily 
preserved. (we don't care about ordering)

FYI, here are my mapper and reducer.


        public static class AvroReachMapper extends 
AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> {
                public void map(DeliveryLogEvent levent, 
AvroCollector<Pair<Long, DeliveryLogEvent>> collector, Reporter reporter)
                        throws IOException {
                        
                        collector.collect(new Pair<Long, DeliveryLogEvent>(1L, 
levent));
                }
        }
        
        public static class Reduce extends AvroReducer<Long, DeliveryLogEvent, 
DeliveryLogEvent> {

                @Override
                public void reduce(Long key, Iterable<DeliveryLogEvent> values,
                                AvroCollector<DeliveryLogEvent> collector, 
Reporter reporter)
                                throws IOException {

                        for (DeliveryLogEvent event : values) {
                                collector.collect(event);
                        }
                }

        }

Also, I'm setting the following:

        AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
        AvroJob.setMapperClass(conf, Mapper.class);
        AvroJob.setMapOutputSchema(conf, SCHEMA);
                
        AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
        AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
        AvroOutputFormat.setDeflateLevel(conf, 9);
        AvroOutputFormat.setSyncInterval(conf, 1024 * 256);

        AvroJob.setReducerClass(conf, Reducer.class);
                
        JobClient.runJob(conf);


Thanks,

Frank Grimes

Reply via email to