Hi Scott,

I added https://issues.apache.org/jira/browse/AVRO-991 to track this feature 
request.

Thanks,

Frank Grimes


On 2012-01-12, at 10:31 PM, Scott Carey wrote:

> 
> 
> On 1/12/12 5:52 PM, "Frank Grimes" <[email protected]> wrote:
> 
>> Hi Scott,
>> 
>> I've looked into this some more and I now see what you mean about appending 
>> to HDFS directly not being possible with the current DataFileWriter API.
>> 
>> That's unfortunate because we really would like to avoid needing to hit disk 
>> just to write temporary files. (and the associated cleanup)
>> 
>> I kinda like the notion of not requiring HDFS APIs to achieve this merging 
>> of Avro files/streams.
>> 
>> Assuming we wanted to be able to stream multiple files as in my example... 
>> could DataFileStream easily be changed to support that use case?
>> i.e. allow it to skip/ignore subsequent header and metadata in the stream or 
>> not error out with "Invalid sync!"?
> 
> 
> That may be possible, open a JIRA to discuss further.  It should be modified 
> to 'reset' to the start of a new file or stream and continue from there, 
> since it needs to read the header and find the new sync value and validate 
> that the schemas match and the codec is compatible.  It may be possible to 
> detect the end of one file and the start of another if the files are streamed 
> back to back, but perhaps not reliably.
> The avro-tools could be extended to have a command line tool that takes a 
> list of files (HDFS or local) and writes a new file (HDFS or local) 
> concatenated and possibly "recodec'd".
> 
>> 
>> Thanks,
>> 
>> Frank Grimes
>> 
>> 
>> On 2012-01-12, at 3:53 PM, Scott Carey wrote:
>> 
>>> 
>>> 
>>> On 1/12/12 12:35 PM, "Frank Grimes" <[email protected]> wrote:
>>> 
>>>> So I decided to try writing my own AvroStreamCombiner utility and it seems 
>>>> to choke when passing multiple input files:
>>>> 
>>>>> hadoop dfs -cat hdfs://hadoop/machine1.log.avro 
>>>>> hdfs://hadoop/machine2.log.avro | ./deliveryLogAvroStreamCombiner.sh > 
>>>>> combined.log.avro
>>>> 
>>>>> Exception in thread "main" java.io.IOException: Invalid sync!     
>>>>> at 
>>>>> org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293)
>>>>> at 
>>>>> org.apache.avro.file.DataFileWriter.appendAllFrom(DataFileWriter.java:329)
>>>>> at DeliveryLogAvroStreamCombiner.main(Unknown Source)
>>>> 
>>>> 
>>>> Here's the code in question:
>>>> 
>>>> public class DeliveryLogAvroStreamCombiner {
>>>>    
>>>>    /**
>>>>     * @param args
>>>>     */
>>>>    public static void main(String[] args) throws Exception {
>>>>            DataFileStream<DeliveryLogEvent> dfs = null;
>>>>            DataFileWriter<DeliveryLogEvent> dfw = null;
>>>>            
>>>>            try {
>>>>                    dfs = new DataFileStream<DeliveryLogEvent>(System.in, 
>>>> new SpecificDatumReader<DeliveryLogEvent>());
>>>>                    
>>>>                    OutputStream stdout = System.out;
>>>>                    
>>>>                    dfw = new DataFileWriter<DeliveryLogEvent>(new 
>>>> SpecificDatumWriter<DeliveryLogEvent>());
>>>>                    dfw.setCodec(CodecFactory.deflateCodec(9));
>>>>                    dfw.setSyncInterval(1024 * 256);
>>>>                    dfw.create(DeliveryLogEvent.SCHEMA$, stdout);
>>>> 
>>>>                    dfw.appendAllFrom(dfs, false);
>>> 
>>> 
>>> dfs is from System.in, which has multiple files one after the other.  Each 
>>> file will need its own DataFileStream (has its own header and metadata).   
>>> 
>>> In Java you could get the list of files, and for each file use HDFS's API 
>>> to open the file stream, and append that to your one file.
>>> In bash you could loop over all the source files and append one at a time 
>>> (the above fails on the second file).  However, in order to append to the 
>>> end of a pre-existing file the only API now takes a File, not a seekable 
>>> stream, so Avro would need a patch to allow that in HDFS (also, only an 
>>> HDFS version that supports appends would work).
>>> 
>>> Other things of note:
>>> You will probably get better total file size compression by using a larger 
>>> sync interval (1M to 4 M) than deflate level 9.  Deflate 9 is VERY slow and 
>>> almost never compresses more than 1% better than deflate 6, which is much 
>>> faster.  I suggest experimenting with the 'recodec' option on some of your 
>>> files to see what the best size / performance tradeoff is.  I doubt that 
>>> 256K (pre-compression) blocks with level 9 compression is the way to go.
>>> 
>>> For reference: http://tukaani.org/lzma/benchmarks.html
>>> (gzip uses deflate compression)
>>> 
>>> -Scott
>>> 
>>> 
>>>>            }
>>>>            finally {
>>>>                    if (dfs != null) try {dfs.close();} catch (Exception e) 
>>>> {e.printStackTrace();}
>>>>                    if (dfw != null) try {dfw.close();} catch (Exception e) 
>>>> {e.printStackTrace();}
>>>>            }
>>>>    }
>>>> 
>>>> }
>>>> 
>>>> Is there any way this could be made to work without needing to download 
>>>> the individual files to disk and calling append for each of them?
>>>> 
>>>> Thanks,
>>>> 
>>>> Frank Grimes
>>>> 
>>>> 
>>>> On 2012-01-12, at 2:24 PM, Frank Grimes wrote:
>>>> 
>>>>> Hi Scott,
>>>>> 
>>>>> If I have a map-only job, would I want only one mapper running to pull 
>>>>> all the records from the source input files and stream/append them to the 
>>>>> target avro file?
>>>>> Would that be no different (or more efficient) than doing "hadoop dfs 
>>>>> -cat file1 file2 file3" and piping the output to append to a "hadoop dfs 
>>>>> -put combinedFile"?
>>>>> In that case, my only question is how would I combine the avro files into 
>>>>> a new file without deserializing them?
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Frank Grimes
>>>>> 
>>>>> 
>>>>> On 2012-01-12, at 1:14 PM, Scott Carey wrote:
>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On 1/12/12 8:27 AM, "Frank Grimes" <[email protected]> wrote:
>>>>>> 
>>>>>>> Hi All,
>>>>>>> 
>>>>>>> We have Avro data files in HDFS which are compressed using the Deflate 
>>>>>>> codec.
>>>>>>> We have written an M/R job using the Avro Mapred API to combine those 
>>>>>>> files.
>>>>>>> 
>>>>>>> It seems to be working fine, however when we run it we notice that the 
>>>>>>> temporary work area (spills, etc) seem to be uncompressed.
>>>>>>> We're thinking we might see a speedup due to reduced I/O if the 
>>>>>>> temporary files are compressed as well.
>>>>>> 
>>>>>> 
>>>>>> If all you want to do is combine the files, there is no reason to 
>>>>>> deserialize and reserialize the contents, and a map-only job could 
>>>>>> suffice.
>>>>>> If this is the case, you might want to consider one of two optoins:
>>>>>> 1.  Use a map only job, with a combined file input.  This will produce 
>>>>>> one file per mapper and no intermediate data.
>>>>>> 2.  Use the Avro data file API to append to a file.  I am not sure if 
>>>>>> this will work with HDFS without some modifications to Avro, but it 
>>>>>> should be possible since the data file APIs can take 
>>>>>> InputStream/OutputStream.  The data file API has the ability to append 
>>>>>> data blocks from the file if the schemas are an exact match.  This can 
>>>>>> be done without deserialization, and optionally can change the 
>>>>>> compression level or leave it alone.
>>>>>> 
>>>>>>> 
>>>>>>> Is there a way to enable "mapred.compress.map.output" in such a way 
>>>>>>> that those temporary files are compressed as Avro/Deflate?
>>>>>>> I tried simply setting conf.setBoolean("mapred.compress.map.output", 
>>>>>>> true); but it didn't seem to have any effect.
>>>>>> 
>>>>>> 
>>>>>> I am not sure, as I haven't tried it myself.  However, the Avro M/R 
>>>>>> should be able to leverage all of the Hadoop compressed intermediate 
>>>>>> forms.  LZO/Snappy are fast and in our cluster Snappy is the default.  
>>>>>> Deflate can be a lot slower but much more compact.
>>>>>> 
>>>>>>> 
>>>>>>> Note that in order to avoid unnecessary sorting overhead, I made each 
>>>>>>> key a constant (1L) so that the logs are combined but ordering isn't 
>>>>>>> necessarily preserved. (we don't care about ordering)
>>>>>> 
>>>>>> 
>>>>>> In that case, I think you can use a map only job.  There may be some 
>>>>>> work to get a single mapper to read many files however.
>>>>>> 
>>>>>>> 
>>>>>>> FYI, here are my mapper and reducer.
>>>>>>> 
>>>>>>> 
>>>>>>>         public static class AvroReachMapper extends 
>>>>>>> AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> {
>>>>>>>                 public void map(DeliveryLogEvent levent, 
>>>>>>> AvroCollector<Pair<Long, DeliveryLogEvent>> collector, Reporter 
>>>>>>> reporter)
>>>>>>>                         throws IOException {
>>>>>>>                         
>>>>>>>                         collector.collect(new Pair<Long, 
>>>>>>> DeliveryLogEvent>(1L, levent));
>>>>>>>                 }
>>>>>>>         }
>>>>>>>         
>>>>>>>         public static class Reduce extends AvroReducer<Long, 
>>>>>>> DeliveryLogEvent, DeliveryLogEvent> {
>>>>>>> 
>>>>>>>                 @Override
>>>>>>>                 public void reduce(Long key, Iterable<DeliveryLogEvent> 
>>>>>>> values,
>>>>>>>                                 AvroCollector<DeliveryLogEvent> 
>>>>>>> collector, Reporter reporter)
>>>>>>>                                 throws IOException {
>>>>>>> 
>>>>>>>                         for (DeliveryLogEvent event : values) {
>>>>>>>                                 collector.collect(event);
>>>>>>>                         }
>>>>>>>                 }
>>>>>>> 
>>>>>>>         }
>>>>>>> 
>>>>>>> 
>>>>>>>         AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>>>>>>         AvroJob.setMapperClass(conf, Mapper.class);
>>>>>>>         AvroJob.setMapOutputSchema(conf, SCHEMA);
>>>>>>>                 
>>>>>>>         AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>>>>>>         AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
>>>>>>>         AvroOutputFormat.setDeflateLevel(conf, 9);
>>>>>>>         AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
>>>>>>> 
>>>>>>>         AvroJob.setReducerClass(conf, Reducer.class);
>>>>>>>                 
>>>>>>>         JobClient.runJob(conf);
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Frank Grimes
>>>>> 
>>>> 
>> 

Reply via email to