So I decided to try writing my own AvroStreamCombiner utility and it seems to 
choke when passing multiple input files:

hadoop dfs -cat hdfs://hadoop/machine1.log.avro hdfs://hadoop/machine2.log.avro 
| ./deliveryLogAvroStreamCombiner.sh > combined.log.avro

Exception in thread "main" java.io.IOException: Invalid sync!   
at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293)
at org.apache.avro.file.DataFileWriter.appendAllFrom(DataFileWriter.java:329)
at DeliveryLogAvroStreamCombiner.main(Unknown Source)


Here's the code in question:

public class DeliveryLogAvroStreamCombiner {
        
        /**
         * @param args
         */
        public static void main(String[] args) throws Exception {
                DataFileStream<DeliveryLogEvent> dfs = null;
                DataFileWriter<DeliveryLogEvent> dfw = null;
                
                try {
                        dfs = new DataFileStream<DeliveryLogEvent>(System.in, 
new SpecificDatumReader<DeliveryLogEvent>());
                        
                        OutputStream stdout = System.out;
                        
                        dfw = new DataFileWriter<DeliveryLogEvent>(new 
SpecificDatumWriter<DeliveryLogEvent>());
                        dfw.setCodec(CodecFactory.deflateCodec(9));
                        dfw.setSyncInterval(1024 * 256);
                        dfw.create(DeliveryLogEvent.SCHEMA$, stdout);

                        dfw.appendAllFrom(dfs, false);
                }
                finally {
                        if (dfs != null) try {dfs.close();} catch (Exception e) 
{e.printStackTrace();}
                        if (dfw != null) try {dfw.close();} catch (Exception e) 
{e.printStackTrace();}
                }
        }

}

Is there any way this could be made to work without needing to download the 
individual files to disk and calling append for each of them?

Thanks,

Frank Grimes


On 2012-01-12, at 2:24 PM, Frank Grimes wrote:

> Hi Scott,
> 
> If I have a map-only job, would I want only one mapper running to pull all 
> the records from the source input files and stream/append them to the target 
> avro file?
> Would that be no different (or more efficient) than doing "hadoop dfs -cat 
> file1 file2 file3" and piping the output to append to a "hadoop dfs -put 
> combinedFile"?
> In that case, my only question is how would I combine the avro files into a 
> new file without deserializing them?
> 
> Thanks,
> 
> Frank Grimes
> 
> 
> On 2012-01-12, at 1:14 PM, Scott Carey wrote:
> 
>> 
>> 
>> On 1/12/12 8:27 AM, "Frank Grimes" <[email protected]> wrote:
>> 
>>> Hi All,
>>> 
>>> We have Avro data files in HDFS which are compressed using the Deflate 
>>> codec.
>>> We have written an M/R job using the Avro Mapred API to combine those files.
>>> 
>>> It seems to be working fine, however when we run it we notice that the 
>>> temporary work area (spills, etc) seem to be uncompressed.
>>> We're thinking we might see a speedup due to reduced I/O if the temporary 
>>> files are compressed as well.
>> 
>> 
>> If all you want to do is combine the files, there is no reason to 
>> deserialize and reserialize the contents, and a map-only job could suffice.
>> If this is the case, you might want to consider one of two optoins:
>> 1.  Use a map only job, with a combined file input.  This will produce one 
>> file per mapper and no intermediate data.
>> 2.  Use the Avro data file API to append to a file.  I am not sure if this 
>> will work with HDFS without some modifications to Avro, but it should be 
>> possible since the data file APIs can take InputStream/OutputStream.  The 
>> data file API has the ability to append data blocks from the file if the 
>> schemas are an exact match.  This can be done without deserialization, and 
>> optionally can change the compression level or leave it alone.
>> 
>>> 
>>> Is there a way to enable "mapred.compress.map.output" in such a way that 
>>> those temporary files are compressed as Avro/Deflate?
>>> I tried simply setting conf.setBoolean("mapred.compress.map.output", true); 
>>> but it didn't seem to have any effect.
>> 
>> 
>> I am not sure, as I haven't tried it myself.  However, the Avro M/R should 
>> be able to leverage all of the Hadoop compressed intermediate forms.  
>> LZO/Snappy are fast and in our cluster Snappy is the default.  Deflate can 
>> be a lot slower but much more compact.
>> 
>>> 
>>> Note that in order to avoid unnecessary sorting overhead, I made each key a 
>>> constant (1L) so that the logs are combined but ordering isn't necessarily 
>>> preserved. (we don't care about ordering)
>> 
>> 
>> In that case, I think you can use a map only job.  There may be some work to 
>> get a single mapper to read many files however.
>> 
>>> 
>>> FYI, here are my mapper and reducer.
>>> 
>>> 
>>>     public static class AvroReachMapper extends 
>>> AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> {
>>>             public void map(DeliveryLogEvent levent, 
>>> AvroCollector<Pair<Long, DeliveryLogEvent>> collector, Reporter reporter)
>>>                     throws IOException {
>>>                     
>>>                     collector.collect(new Pair<Long, DeliveryLogEvent>(1L, 
>>> levent));
>>>             }
>>>     }
>>>     
>>>     public static class Reduce extends AvroReducer<Long, DeliveryLogEvent, 
>>> DeliveryLogEvent> {
>>> 
>>>             @Override
>>>             public void reduce(Long key, Iterable<DeliveryLogEvent> values,
>>>                             AvroCollector<DeliveryLogEvent> collector, 
>>> Reporter reporter)
>>>                             throws IOException {
>>> 
>>>                     for (DeliveryLogEvent event : values) {
>>>                             collector.collect(event);
>>>                     }
>>>             }
>>> 
>>>     }
>>> 
>>> 
>>>     AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>>     AvroJob.setMapperClass(conf, Mapper.class);
>>>     AvroJob.setMapOutputSchema(conf, SCHEMA);
>>>             
>>>     AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>>     AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
>>>     AvroOutputFormat.setDeflateLevel(conf, 9);
>>>     AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
>>> 
>>>     AvroJob.setReducerClass(conf, Reducer.class);
>>>             
>>>     JobClient.runJob(conf);
>>> 
>>> 
>>> Thanks,
>>> 
>>> Frank Grimes
> 

Reply via email to