Hi All,
We have Avro data files in HDFS which are compressed using the Deflate codec.
We have written an M/R job using the Avro Mapred API to combine those files.
It seems to be working fine, however when we run it we notice that the
temporary work area (spills, etc) seem to be uncompressed.
We're thinking we might see a speedup due to reduced I/O if the temporary files
are compressed as well.
Is there a way to enable "mapred.compress.map.output" in such a way that those
temporary files are compressed as Avro/Deflate?
I tried simply setting conf.setBoolean("mapred.compress.map.output", true); but
it didn't seem to have any effect.
Note that in order to avoid unnecessary sorting overhead, I made each key a
constant (1L) so that the logs are combined but ordering isn't necessarily
preserved. (we don't care about ordering)
FYI, here are my mapper and reducer.
public static class AvroReachMapper extends
AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> {
public void map(DeliveryLogEvent levent,
AvroCollector<Pair<Long, DeliveryLogEvent>> collector, Reporter reporter)
throws IOException {
collector.collect(new Pair<Long, DeliveryLogEvent>(1L,
levent));
}
}
public static class Reduce extends AvroReducer<Long, DeliveryLogEvent,
DeliveryLogEvent> {
@Override
public void reduce(Long key, Iterable<DeliveryLogEvent> values,
AvroCollector<DeliveryLogEvent> collector,
Reporter reporter)
throws IOException {
for (DeliveryLogEvent event : values) {
collector.collect(event);
}
}
}
Also, I'm setting the following:
AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
AvroJob.setMapperClass(conf, Mapper.class);
AvroJob.setMapOutputSchema(conf, SCHEMA);
AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
AvroOutputFormat.setDeflateLevel(conf, 9);
AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
AvroJob.setReducerClass(conf, Reducer.class);
JobClient.runJob(conf);
Thanks,
Frank Grimes