So I decided to try writing my own AvroStreamCombiner utility and it seems to
choke when passing multiple input files:
hadoop dfs -cat hdfs://hadoop/machine1.log.avro hdfs://hadoop/machine2.log.avro
| ./deliveryLogAvroStreamCombiner.sh > combined.log.avro
Exception in thread "main" java.io.IOException: Invalid sync!
at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293)
at org.apache.avro.file.DataFileWriter.appendAllFrom(DataFileWriter.java:329)
at DeliveryLogAvroStreamCombiner.main(Unknown Source)
Here's the code in question:
public class DeliveryLogAvroStreamCombiner {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
DataFileStream<DeliveryLogEvent> dfs = null;
DataFileWriter<DeliveryLogEvent> dfw = null;
try {
dfs = new DataFileStream<DeliveryLogEvent>(System.in,
new SpecificDatumReader<DeliveryLogEvent>());
OutputStream stdout = System.out;
dfw = new DataFileWriter<DeliveryLogEvent>(new
SpecificDatumWriter<DeliveryLogEvent>());
dfw.setCodec(CodecFactory.deflateCodec(9));
dfw.setSyncInterval(1024 * 256);
dfw.create(DeliveryLogEvent.SCHEMA$, stdout);
dfw.appendAllFrom(dfs, false);
}
finally {
if (dfs != null) try {dfs.close();} catch (Exception e)
{e.printStackTrace();}
if (dfw != null) try {dfw.close();} catch (Exception e)
{e.printStackTrace();}
}
}
}
Is there any way this could be made to work without needing to download the
individual files to disk and calling append for each of them?
Thanks,
Frank Grimes
On 2012-01-12, at 2:24 PM, Frank Grimes wrote:
> Hi Scott,
>
> If I have a map-only job, would I want only one mapper running to pull all
> the records from the source input files and stream/append them to the target
> avro file?
> Would that be no different (or more efficient) than doing "hadoop dfs -cat
> file1 file2 file3" and piping the output to append to a "hadoop dfs -put
> combinedFile"?
> In that case, my only question is how would I combine the avro files into a
> new file without deserializing them?
>
> Thanks,
>
> Frank Grimes
>
>
> On 2012-01-12, at 1:14 PM, Scott Carey wrote:
>
>>
>>
>> On 1/12/12 8:27 AM, "Frank Grimes" <[email protected]> wrote:
>>
>>> Hi All,
>>>
>>> We have Avro data files in HDFS which are compressed using the Deflate
>>> codec.
>>> We have written an M/R job using the Avro Mapred API to combine those files.
>>>
>>> It seems to be working fine, however when we run it we notice that the
>>> temporary work area (spills, etc) seem to be uncompressed.
>>> We're thinking we might see a speedup due to reduced I/O if the temporary
>>> files are compressed as well.
>>
>>
>> If all you want to do is combine the files, there is no reason to
>> deserialize and reserialize the contents, and a map-only job could suffice.
>> If this is the case, you might want to consider one of two optoins:
>> 1. Use a map only job, with a combined file input. This will produce one
>> file per mapper and no intermediate data.
>> 2. Use the Avro data file API to append to a file. I am not sure if this
>> will work with HDFS without some modifications to Avro, but it should be
>> possible since the data file APIs can take InputStream/OutputStream. The
>> data file API has the ability to append data blocks from the file if the
>> schemas are an exact match. This can be done without deserialization, and
>> optionally can change the compression level or leave it alone.
>>
>>>
>>> Is there a way to enable "mapred.compress.map.output" in such a way that
>>> those temporary files are compressed as Avro/Deflate?
>>> I tried simply setting conf.setBoolean("mapred.compress.map.output", true);
>>> but it didn't seem to have any effect.
>>
>>
>> I am not sure, as I haven't tried it myself. However, the Avro M/R should
>> be able to leverage all of the Hadoop compressed intermediate forms.
>> LZO/Snappy are fast and in our cluster Snappy is the default. Deflate can
>> be a lot slower but much more compact.
>>
>>>
>>> Note that in order to avoid unnecessary sorting overhead, I made each key a
>>> constant (1L) so that the logs are combined but ordering isn't necessarily
>>> preserved. (we don't care about ordering)
>>
>>
>> In that case, I think you can use a map only job. There may be some work to
>> get a single mapper to read many files however.
>>
>>>
>>> FYI, here are my mapper and reducer.
>>>
>>>
>>> public static class AvroReachMapper extends
>>> AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> {
>>> public void map(DeliveryLogEvent levent,
>>> AvroCollector<Pair<Long, DeliveryLogEvent>> collector, Reporter reporter)
>>> throws IOException {
>>>
>>> collector.collect(new Pair<Long, DeliveryLogEvent>(1L,
>>> levent));
>>> }
>>> }
>>>
>>> public static class Reduce extends AvroReducer<Long, DeliveryLogEvent,
>>> DeliveryLogEvent> {
>>>
>>> @Override
>>> public void reduce(Long key, Iterable<DeliveryLogEvent> values,
>>> AvroCollector<DeliveryLogEvent> collector,
>>> Reporter reporter)
>>> throws IOException {
>>>
>>> for (DeliveryLogEvent event : values) {
>>> collector.collect(event);
>>> }
>>> }
>>>
>>> }
>>>
>>>
>>> AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>> AvroJob.setMapperClass(conf, Mapper.class);
>>> AvroJob.setMapOutputSchema(conf, SCHEMA);
>>>
>>> AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>> AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
>>> AvroOutputFormat.setDeflateLevel(conf, 9);
>>> AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
>>>
>>> AvroJob.setReducerClass(conf, Reducer.class);
>>>
>>> JobClient.runJob(conf);
>>>
>>>
>>> Thanks,
>>>
>>> Frank Grimes
>