Hi Guys,
I have a custom source and consuming whole 'gz' files as byte arrays and each
file is a single event. I'd like to write the file to HDFS. During the write
some additional bytes are added and therefore file is corrupted, not able to
unzip it any more. I know this is not a good usecase for Flume but I'd like to
keep a consistent data collection design and was hoping I could pass full gz
files to HDFS without the file being corrupted. Either the 'timestamp' header
is causing issue or the 'text' file format, but I'm not sure. Any solution?
Thanks,
Peyman
XXX.sources = xxx
XXX.channels = MemChannel
XXX.sinks = HDFS
XXX.sources.xxx.type = com.xxx.xxx.xxx.Source
XXX.sources.xxx.channels = MemChannel
XXX.sinks.HDFS.channel = MemChannel
XXX.sinks.HDFS.type = hdfs
XXX.sinks.HDFS.hdfs.path = hdfs://xxxx/user/xxx/xxx/gzfiles/%Y/%m/%d/
XXX.sinks.HDFS.hdfs.fileType = DataStream
XXX.sinks.HDFS.hdfs.filePrefix = xxxx
XXX.sinks.HDFS.hdfs.batchSize = 1
XXX.sinks.HDFS.hdfs.rollSize = 0
XXX.sinks.HDFS.hdfs.idleTimeout = 3
XXX.sinks.HDFS.hdfs.rollInterval = 0
XXX.sinks.HDFS.hdfs.rollCount = 1
XXX.channels.MemChannel.type = memory
XXX.channels.MemChannel.capacity = 1
XXX.channels.MemChannel.transactionCapacity = 1
XXX.channels.MemChannel.byteCapacityBufferPercentag = 100
InputStream in = Toolbox.inputStreamUrlConnection(url, account.getAuth1(),
account.getAuth2());
outputStream = new ByteArrayOutputStream();
byte[] buf = new byte[1024]; // optimize the size of buffer to your need
int num;
while ((num = in.read(buf)) != -1) {
outputStream.write(buf, 0, num);
}
headers.put("timestamp", String.valueOf(new Date().getTime()));
Event e = EventBuilder.withBody(outputStream.toByteArray(), headers);
getChannelProcessor().processEvent(e);