Hi Scott, I added https://issues.apache.org/jira/browse/AVRO-991 to track this feature request.
Thanks, Frank Grimes On 2012-01-12, at 10:31 PM, Scott Carey wrote: > > > On 1/12/12 5:52 PM, "Frank Grimes" <[email protected]> wrote: > >> Hi Scott, >> >> I've looked into this some more and I now see what you mean about appending >> to HDFS directly not being possible with the current DataFileWriter API. >> >> That's unfortunate because we really would like to avoid needing to hit disk >> just to write temporary files. (and the associated cleanup) >> >> I kinda like the notion of not requiring HDFS APIs to achieve this merging >> of Avro files/streams. >> >> Assuming we wanted to be able to stream multiple files as in my example... >> could DataFileStream easily be changed to support that use case? >> i.e. allow it to skip/ignore subsequent header and metadata in the stream or >> not error out with "Invalid sync!"? > > > That may be possible, open a JIRA to discuss further. It should be modified > to 'reset' to the start of a new file or stream and continue from there, > since it needs to read the header and find the new sync value and validate > that the schemas match and the codec is compatible. It may be possible to > detect the end of one file and the start of another if the files are streamed > back to back, but perhaps not reliably. > The avro-tools could be extended to have a command line tool that takes a > list of files (HDFS or local) and writes a new file (HDFS or local) > concatenated and possibly "recodec'd". > >> >> Thanks, >> >> Frank Grimes >> >> >> On 2012-01-12, at 3:53 PM, Scott Carey wrote: >> >>> >>> >>> On 1/12/12 12:35 PM, "Frank Grimes" <[email protected]> wrote: >>> >>>> So I decided to try writing my own AvroStreamCombiner utility and it seems >>>> to choke when passing multiple input files: >>>> >>>>> hadoop dfs -cat hdfs://hadoop/machine1.log.avro >>>>> hdfs://hadoop/machine2.log.avro | ./deliveryLogAvroStreamCombiner.sh > >>>>> combined.log.avro >>>> >>>>> Exception in thread "main" java.io.IOException: Invalid sync! >>>>> at >>>>> org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293) >>>>> at >>>>> org.apache.avro.file.DataFileWriter.appendAllFrom(DataFileWriter.java:329) >>>>> at DeliveryLogAvroStreamCombiner.main(Unknown Source) >>>> >>>> >>>> Here's the code in question: >>>> >>>> public class DeliveryLogAvroStreamCombiner { >>>> >>>> /** >>>> * @param args >>>> */ >>>> public static void main(String[] args) throws Exception { >>>> DataFileStream<DeliveryLogEvent> dfs = null; >>>> DataFileWriter<DeliveryLogEvent> dfw = null; >>>> >>>> try { >>>> dfs = new DataFileStream<DeliveryLogEvent>(System.in, >>>> new SpecificDatumReader<DeliveryLogEvent>()); >>>> >>>> OutputStream stdout = System.out; >>>> >>>> dfw = new DataFileWriter<DeliveryLogEvent>(new >>>> SpecificDatumWriter<DeliveryLogEvent>()); >>>> dfw.setCodec(CodecFactory.deflateCodec(9)); >>>> dfw.setSyncInterval(1024 * 256); >>>> dfw.create(DeliveryLogEvent.SCHEMA$, stdout); >>>> >>>> dfw.appendAllFrom(dfs, false); >>> >>> >>> dfs is from System.in, which has multiple files one after the other. Each >>> file will need its own DataFileStream (has its own header and metadata). >>> >>> In Java you could get the list of files, and for each file use HDFS's API >>> to open the file stream, and append that to your one file. >>> In bash you could loop over all the source files and append one at a time >>> (the above fails on the second file). However, in order to append to the >>> end of a pre-existing file the only API now takes a File, not a seekable >>> stream, so Avro would need a patch to allow that in HDFS (also, only an >>> HDFS version that supports appends would work). >>> >>> Other things of note: >>> You will probably get better total file size compression by using a larger >>> sync interval (1M to 4 M) than deflate level 9. Deflate 9 is VERY slow and >>> almost never compresses more than 1% better than deflate 6, which is much >>> faster. I suggest experimenting with the 'recodec' option on some of your >>> files to see what the best size / performance tradeoff is. I doubt that >>> 256K (pre-compression) blocks with level 9 compression is the way to go. >>> >>> For reference: http://tukaani.org/lzma/benchmarks.html >>> (gzip uses deflate compression) >>> >>> -Scott >>> >>> >>>> } >>>> finally { >>>> if (dfs != null) try {dfs.close();} catch (Exception e) >>>> {e.printStackTrace();} >>>> if (dfw != null) try {dfw.close();} catch (Exception e) >>>> {e.printStackTrace();} >>>> } >>>> } >>>> >>>> } >>>> >>>> Is there any way this could be made to work without needing to download >>>> the individual files to disk and calling append for each of them? >>>> >>>> Thanks, >>>> >>>> Frank Grimes >>>> >>>> >>>> On 2012-01-12, at 2:24 PM, Frank Grimes wrote: >>>> >>>>> Hi Scott, >>>>> >>>>> If I have a map-only job, would I want only one mapper running to pull >>>>> all the records from the source input files and stream/append them to the >>>>> target avro file? >>>>> Would that be no different (or more efficient) than doing "hadoop dfs >>>>> -cat file1 file2 file3" and piping the output to append to a "hadoop dfs >>>>> -put combinedFile"? >>>>> In that case, my only question is how would I combine the avro files into >>>>> a new file without deserializing them? >>>>> >>>>> Thanks, >>>>> >>>>> Frank Grimes >>>>> >>>>> >>>>> On 2012-01-12, at 1:14 PM, Scott Carey wrote: >>>>> >>>>>> >>>>>> >>>>>> On 1/12/12 8:27 AM, "Frank Grimes" <[email protected]> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> We have Avro data files in HDFS which are compressed using the Deflate >>>>>>> codec. >>>>>>> We have written an M/R job using the Avro Mapred API to combine those >>>>>>> files. >>>>>>> >>>>>>> It seems to be working fine, however when we run it we notice that the >>>>>>> temporary work area (spills, etc) seem to be uncompressed. >>>>>>> We're thinking we might see a speedup due to reduced I/O if the >>>>>>> temporary files are compressed as well. >>>>>> >>>>>> >>>>>> If all you want to do is combine the files, there is no reason to >>>>>> deserialize and reserialize the contents, and a map-only job could >>>>>> suffice. >>>>>> If this is the case, you might want to consider one of two optoins: >>>>>> 1. Use a map only job, with a combined file input. This will produce >>>>>> one file per mapper and no intermediate data. >>>>>> 2. Use the Avro data file API to append to a file. I am not sure if >>>>>> this will work with HDFS without some modifications to Avro, but it >>>>>> should be possible since the data file APIs can take >>>>>> InputStream/OutputStream. The data file API has the ability to append >>>>>> data blocks from the file if the schemas are an exact match. This can >>>>>> be done without deserialization, and optionally can change the >>>>>> compression level or leave it alone. >>>>>> >>>>>>> >>>>>>> Is there a way to enable "mapred.compress.map.output" in such a way >>>>>>> that those temporary files are compressed as Avro/Deflate? >>>>>>> I tried simply setting conf.setBoolean("mapred.compress.map.output", >>>>>>> true); but it didn't seem to have any effect. >>>>>> >>>>>> >>>>>> I am not sure, as I haven't tried it myself. However, the Avro M/R >>>>>> should be able to leverage all of the Hadoop compressed intermediate >>>>>> forms. LZO/Snappy are fast and in our cluster Snappy is the default. >>>>>> Deflate can be a lot slower but much more compact. >>>>>> >>>>>>> >>>>>>> Note that in order to avoid unnecessary sorting overhead, I made each >>>>>>> key a constant (1L) so that the logs are combined but ordering isn't >>>>>>> necessarily preserved. (we don't care about ordering) >>>>>> >>>>>> >>>>>> In that case, I think you can use a map only job. There may be some >>>>>> work to get a single mapper to read many files however. >>>>>> >>>>>>> >>>>>>> FYI, here are my mapper and reducer. >>>>>>> >>>>>>> >>>>>>> public static class AvroReachMapper extends >>>>>>> AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> { >>>>>>> public void map(DeliveryLogEvent levent, >>>>>>> AvroCollector<Pair<Long, DeliveryLogEvent>> collector, Reporter >>>>>>> reporter) >>>>>>> throws IOException { >>>>>>> >>>>>>> collector.collect(new Pair<Long, >>>>>>> DeliveryLogEvent>(1L, levent)); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> public static class Reduce extends AvroReducer<Long, >>>>>>> DeliveryLogEvent, DeliveryLogEvent> { >>>>>>> >>>>>>> @Override >>>>>>> public void reduce(Long key, Iterable<DeliveryLogEvent> >>>>>>> values, >>>>>>> AvroCollector<DeliveryLogEvent> >>>>>>> collector, Reporter reporter) >>>>>>> throws IOException { >>>>>>> >>>>>>> for (DeliveryLogEvent event : values) { >>>>>>> collector.collect(event); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$); >>>>>>> AvroJob.setMapperClass(conf, Mapper.class); >>>>>>> AvroJob.setMapOutputSchema(conf, SCHEMA); >>>>>>> >>>>>>> AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$); >>>>>>> AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC); >>>>>>> AvroOutputFormat.setDeflateLevel(conf, 9); >>>>>>> AvroOutputFormat.setSyncInterval(conf, 1024 * 256); >>>>>>> >>>>>>> AvroJob.setReducerClass(conf, Reducer.class); >>>>>>> >>>>>>> JobClient.runJob(conf); >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Frank Grimes >>>>> >>>> >>
