Hi Guys,

I have a custom source and consuming whole 'gz' files as byte arrays and each 
file is a single event. I'd like to write the file to HDFS. During the write 
some additional bytes are added and therefore file is corrupted, not able to 
unzip it any more. I know this is not a good usecase for Flume but I'd like to 
keep a consistent data collection design and was hoping I could pass full gz 
files to HDFS without the file being corrupted. Either the 'timestamp' header 
is causing issue or the 'text' file format, but I'm not sure. Any solution?
Thanks,
Peyman

XXX.sources = xxx
XXX.channels = MemChannel
XXX.sinks = HDFS

XXX.sources.xxx.type = com.xxx.xxx.xxx.Source
XXX.sources.xxx.channels = MemChannel

XXX.sinks.HDFS.channel = MemChannel
XXX.sinks.HDFS.type = hdfs
XXX.sinks.HDFS.hdfs.path = hdfs://xxxx/user/xxx/xxx/gzfiles/%Y/%m/%d/
XXX.sinks.HDFS.hdfs.fileType = DataStream
XXX.sinks.HDFS.hdfs.filePrefix = xxxx
XXX.sinks.HDFS.hdfs.batchSize = 1
XXX.sinks.HDFS.hdfs.rollSize = 0
XXX.sinks.HDFS.hdfs.idleTimeout = 3
XXX.sinks.HDFS.hdfs.rollInterval = 0
XXX.sinks.HDFS.hdfs.rollCount = 1

XXX.channels.MemChannel.type = memory
XXX.channels.MemChannel.capacity = 1
XXX.channels.MemChannel.transactionCapacity = 1
XXX.channels.MemChannel.byteCapacityBufferPercentag = 100


InputStream in = Toolbox.inputStreamUrlConnection(url, account.getAuth1(), 
account.getAuth2());
outputStream = new ByteArrayOutputStream();
byte[] buf = new byte[1024]; // optimize the size of buffer to your need
int num;
while ((num = in.read(buf)) != -1) {
       outputStream.write(buf, 0, num);
}
headers.put("timestamp", String.valueOf(new Date().getTime()));
Event e = EventBuilder.withBody(outputStream.toByteArray(), headers);
getChannelProcessor().processEvent(e);


Reply via email to