On 1/12/12 5:52 PM, "Frank Grimes" <[email protected]> wrote:
> Hi Scott, > > I've looked into this some more and I now see what you mean about appending to > HDFS directly not being possible with the current DataFileWriter API. > > That's unfortunate because we really would like to avoid needing to hit disk > just to write temporary files. (and the associated cleanup) > > I kinda like the notion of not requiring HDFS APIs to achieve this merging of > Avro files/streams. > > Assuming we wanted to be able to stream multiple files as in my example... > could DataFileStream easily be changed to support that use case? > i.e. allow it to skip/ignore subsequent header and metadata in the stream or > not error out with "Invalid sync!"? That may be possible, open a JIRA to discuss further. It should be modified to 'reset' to the start of a new file or stream and continue from there, since it needs to read the header and find the new sync value and validate that the schemas match and the codec is compatible. It may be possible to detect the end of one file and the start of another if the files are streamed back to back, but perhaps not reliably. The avro-tools could be extended to have a command line tool that takes a list of files (HDFS or local) and writes a new file (HDFS or local) concatenated and possibly "recodec'd". > > Thanks, > > Frank Grimes > > > On 2012-01-12, at 3:53 PM, Scott Carey wrote: > >> >> >> On 1/12/12 12:35 PM, "Frank Grimes" <[email protected]> wrote: >> >>> So I decided to try writing my own AvroStreamCombiner utility and it seems >>> to choke when passing multiple input files: >>> >>>> hadoop dfs -cat hdfs://hadoop/machine1.log.avro >>>> hdfs://hadoop/machine2.log.avro | ./deliveryLogAvroStreamCombiner.sh > >>>> combined.log.avro >>> >>>> Exception in thread "main" java.io.IOException: Invalid sync! >>>> at >>>> org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293) >>>> at >>>> org.apache.avro.file.DataFileWriter.appendAllFrom(DataFileWriter.java:329) >>>> at DeliveryLogAvroStreamCombiner.main(Unknown Source) >>> >>> >>> Here's the code in question: >>> >>> public class DeliveryLogAvroStreamCombiner { >>> >>> /** >>> * @param args >>> */ >>> public static void main(String[] args) throws Exception { >>> DataFileStream<DeliveryLogEvent> dfs = null; >>> DataFileWriter<DeliveryLogEvent> dfw = null; >>> >>> try { >>> dfs = new DataFileStream<DeliveryLogEvent>(System.in, new >>> SpecificDatumReader<DeliveryLogEvent>()); >>> >>> OutputStream stdout = System.out; >>> >>> dfw = new DataFileWriter<DeliveryLogEvent>(new >>> SpecificDatumWriter<DeliveryLogEvent>()); >>> dfw.setCodec(CodecFactory.deflateCodec(9)); >>> dfw.setSyncInterval(1024 * 256); >>> dfw.create(DeliveryLogEvent.SCHEMA$, stdout); >>> >>> dfw.appendAllFrom(dfs, false); >> >> dfs is from System.in, which has multiple files one after the other. Each >> file will need its own DataFileStream (has its own header and metadata). >> >> In Java you could get the list of files, and for each file use HDFS's API to >> open the file stream, and append that to your one file. >> In bash you could loop over all the source files and append one at a time >> (the above fails on the second file). However, in order to append to the end >> of a pre-existing file the only API now takes a File, not a seekable stream, >> so Avro would need a patch to allow that in HDFS (also, only an HDFS version >> that supports appends would work). >> >> Other things of note: >> You will probably get better total file size compression by using a larger >> sync interval (1M to 4 M) than deflate level 9. Deflate 9 is VERY slow and >> almost never compresses more than 1% better than deflate 6, which is much >> faster. I suggest experimenting with the 'recodec' option on some of your >> files to see what the best size / performance tradeoff is. I doubt that 256K >> (pre-compression) blocks with level 9 compression is the way to go. >> >> For reference: http://tukaani.org/lzma/benchmarks.html >> (gzip uses deflate compression) >> >> -Scott >> >> >>> } >>> finally { >>> if (dfs != null) try {dfs.close();} catch (Exception e) >>> {e.printStackTrace();} >>> if (dfw != null) try {dfw.close();} catch (Exception e) >>> {e.printStackTrace();} >>> } >>> } >>> >>> } >>> >>> Is there any way this could be made to work without needing to download the >>> individual files to disk and calling append for each of them? >>> >>> Thanks, >>> >>> Frank Grimes >>> >>> >>> On 2012-01-12, at 2:24 PM, Frank Grimes wrote: >>> >>>> Hi Scott, >>>> >>>> If I have a map-only job, would I want only one mapper running to pull all >>>> the records from the source input files and stream/append them to the >>>> target avro file? >>>> Would that be no different (or more efficient) than doing "hadoop dfs -cat >>>> file1 file2 file3" and piping the output to append to a "hadoop dfs -put >>>> combinedFile"? >>>> In that case, my only question is how would I combine the avro files into a >>>> new file without deserializing them? >>>> >>>> Thanks, >>>> >>>> Frank Grimes >>>> >>>> >>>> On 2012-01-12, at 1:14 PM, Scott Carey wrote: >>>> >>>>> >>>>> >>>>> On 1/12/12 8:27 AM, "Frank Grimes" <[email protected]> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> We have Avro data files in HDFS which are compressed using the Deflate >>>>>> codec. >>>>>> We have written an M/R job using the Avro Mapred API to combine those >>>>>> files. >>>>>> >>>>>> It seems to be working fine, however when we run it we notice that the >>>>>> temporary work area (spills, etc) seem to be uncompressed. >>>>>> We're thinking we might see a speedup due to reduced I/O if the temporary >>>>>> files are compressed as well. >>>>> >>>>> If all you want to do is combine the files, there is no reason to >>>>> deserialize and reserialize the contents, and a map-only job could >>>>> suffice. >>>>> If this is the case, you might want to consider one of two optoins: >>>>> 1. Use a map only job, with a combined file input. This will produce one >>>>> file per mapper and no intermediate data. >>>>> 2. Use the Avro data file API to append to a file. I am not sure if this >>>>> will work with HDFS without some modifications to Avro, but it should be >>>>> possible since the data file APIs can take InputStream/OutputStream. The >>>>> data file API has the ability to append data blocks from the file if the >>>>> schemas are an exact match. This can be done without deserialization, and >>>>> optionally can change the compression level or leave it alone. >>>>> >>>>>> >>>>>> Is there a way to enable "mapred.compress.map.output" in such a way that >>>>>> those temporary files are compressed as Avro/Deflate? >>>>>> I tried simply setting conf.setBoolean("mapred.compress.map.output", >>>>>> true); but it didn't seem to have any effect. >>>>> >>>>> I am not sure, as I haven't tried it myself. However, the Avro M/R should >>>>> be able to leverage all of the Hadoop compressed intermediate forms. >>>>> LZO/Snappy are fast and in our cluster Snappy is the default. Deflate can >>>>> be a lot slower but much more compact. >>>>> >>>>>> >>>>>> Note that in order to avoid unnecessary sorting overhead, I made each key >>>>>> a constant (1L) so that the logs are combined but ordering isn't >>>>>> necessarily preserved. (we don't care about ordering) >>>>> >>>>> In that case, I think you can use a map only job. There may be some work >>>>> to get a single mapper to read many files however. >>>>> >>>>>> >>>>>> FYI, here are my mapper and reducer. >>>>>> >>>>>> >>>>>> public static class AvroReachMapper extends AvroMapper<DeliveryLogEvent, >>>>>> Pair<Long, DeliveryLogEvent>> { >>>>>> public void map(DeliveryLogEvent levent, AvroCollector<Pair<Long, >>>>>> DeliveryLogEvent>> collector, Reporter reporter) >>>>>> throws IOException { >>>>>> >>>>>> collector.collect(new Pair<Long, DeliveryLogEvent>(1L, levent)); >>>>>> } >>>>>> } >>>>>> >>>>>> public static class Reduce extends AvroReducer<Long, DeliveryLogEvent, >>>>>> DeliveryLogEvent> { >>>>>> >>>>>> @Override >>>>>> public void reduce(Long key, Iterable<DeliveryLogEvent> values, >>>>>> AvroCollector<DeliveryLogEvent> collector, Reporter reporter) >>>>>> throws IOException { >>>>>> >>>>>> for (DeliveryLogEvent event : values) { >>>>>> collector.collect(event); >>>>>> } >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$); >>>>>> AvroJob.setMapperClass(conf, Mapper.class); >>>>>> AvroJob.setMapOutputSchema(conf, SCHEMA); >>>>>> >>>>>> AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$); >>>>>> AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC); >>>>>> AvroOutputFormat.setDeflateLevel(conf, 9); >>>>>> AvroOutputFormat.setSyncInterval(conf, 1024 * 256); >>>>>> >>>>>> AvroJob.setReducerClass(conf, Reducer.class); >>>>>> >>>>>> JobClient.runJob(conf); >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Frank Grimes >>>> >>> >
