Try adding: collector102.sinks.sink1.hdfs.writeFormat = TEXT collector102.sinks.sink2.hdfs.writeFormat = TEXT
- Connor On Mon, Jan 14, 2013 at 2:34 PM, Sagar Mehta <[email protected]> wrote: > Yeah sure!! > > smehta@collector102:/opt/flume/conf$ cat hdfs.conf > # hdfs.conf: This is a configuration file to configures Flume NG to use > # An exec source to get a live tail of the jetty logFile > # An hdfs sink to write events to the hdfs on the test cluster > # A file based channel to connect the above source and sink > > # Name the components on this agent > collector102.sources = source1 > collector102.sinks = sink1 sink2 > collector102.channels = channel1 channel2 > > # Configure the source > collector102.sources.source1.type = exec > collector102.sources.source1.command = tail -F /opt/jetty/logFile.log > > # Configure the interceptors > collector102.sources.source1.interceptors = TimestampInterceptor > HostInterceptor > > # We use the Timestamp interceptor to get timestamps of when flume > receives events > # This is used for figuring out the bucket to which an event goes > collector102.sources.source1.interceptors.TimestampInterceptor.type = > timestamp > > # We use the Host interceptor to populate the host header with the fully > qualified domain name of the collector. > # That way we know which file in the sink respresents which collector. > collector102.sources.source1.interceptors.HostInterceptor.type = > org.apache.flume.interceptor.HostInterceptor$Builder > collector102.sources.source1.interceptors.HostInterceptor.preserveExisting > = false > collector102.sources.source1.interceptors.HostInterceptor.useIP = false > collector102.sources.source1.interceptors.HostInterceptor.hostHeader = host > > # Configure the sink > > collector102.sinks.sink1.type = hdfs > > # Configure the bucketing > collector102.sinks.sink1.hdfs.path=hdfs:// > namenode301.ngpipes.milp.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00 > > # Prefix the file with the source so that we know where the events in the > file came from > collector102.sinks.sink1.hdfs.filePrefix = %{host} > > # We roll the flume output file based on time interval - currently every 5 > minutes > collector102.sinks.sink1.hdfs.rollSize = 0 > collector102.sinks.sink1.hdfs.rollCount = 0 > collector102.sinks.sink1.hdfs.rollInterval = 300 > > #gzip compression related settings > collector102.sinks.sink1.hdfs.codeC = gzip > collector102.sinks.sink1.hdfs.fileType = CompressedStream > collector102.sinks.sink1.hdfs.fileSuffix = .gz > > # Configure the sink > > collector102.sinks.sink2.type = hdfs > > # Configure the bucketing > collector102.sinks.sink2.hdfs.path=hdfs:// > namenode5001.ngpipes.sac.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00 > > # Prefix the file with the source so that we know where the events in the > file came from > collector102.sinks.sink2.hdfs.filePrefix = %{host} > > # We roll the flume output file based on time interval - currently every 5 > minutes > collector102.sinks.sink2.hdfs.rollSize = 0 > collector102.sinks.sink2.hdfs.rollCount = 0 > collector102.sinks.sink2.hdfs.rollInterval = 300 > collector102.sinks.sink2.hdfs.fileType = DataStream > > # Configure the channel that connects the source to the sink > > # Use a channel which buffers events in filesystem > collector102.channels.channel1.type = file > collector102.channels.channel1.checkpointDir = > /data/flume_data/channel1/checkpoint > collector102.channels.channel1.dataDirs = /data/flume_data/channel1/data > > # Use a channel which buffers events in filesystem > collector102.channels.channel2.type = file > collector102.channels.channel2.checkpointDir = > /data/flume_data/channel2/checkpoint > collector102.channels.channel2.dataDirs = /data/flume_data/channel2/data > > # Bind the source and sink to the channel configured above > collector102.sources.source1.channels = channel1 channel2 > collector102.sinks.sink1.channel = channel1 > collector102.sinks.sink2.channel = channel2 > > On Mon, Jan 14, 2013 at 2:25 PM, Connor Woodson <[email protected]>wrote: > >> Can you post your full config? >> >> - Connor >> >> >> On Mon, Jan 14, 2013 at 11:18 AM, Sagar Mehta <[email protected]>wrote: >> >>> Hi Guys, >>> >>> I'm using Flume Ng and it works great for me. In essence I'm using an >>> exec source for doing tail -F on a logfile and using two HDFS sinks using >>> a File channel. So far so great - Now I'm trying to use gzip compression >>> using the following config as per the Flume-Ng User guide at >>> http://flume.apache.org/FlumeUserGuide.html. >>> >>> #gzip compression related settings >>> collector102.sinks.sink1.hdfs.codeC = gzip >>> collector102.sinks.sink1.hdfs.fileType = CompressedStream >>> collector102.sinks.sink1.hdfs.fileSuffix = .gz >>> >>> However this is what looks to be happening >>> >>> *Flume seems to write gzipped compressed output [I see the .gz files in >>> the output buckets], however when I try to decompress it - I get an error >>> about 'trailing garbage ignored' and the decompressed output is in fact >>> smaller in size.* >>> >>> hadoop@jobtracker301:/home/hadoop/sagar/temp$ ls -ltr >>> collector102.ngpipes.sac.ngmoco.com.1357936638713.gz >>> -rw-r--r-- 1 hadoop hadoop *5381235* 2013-01-11 20:44 >>> *collector102.ngpipes.sac.ngmoco.com.1357936638713.gz* >>> >>> hadoop@jobtracker301:/home/hadoop/sagar/temp$ gunzip >>> collector102.ngpipes.sac.ngmoco.com.1357936638713.gz >>> >>> *gzip: collector102.ngpipes.sac.ngmoco.com.1357936638713.gz: >>> decompression OK, trailing garbage ignored* >>> * >>> * >>> hadoop@jobtracker301:/home/hadoop/sagar/temp$ ls -l >>> >>> -rw-r--r-- 1 hadoop hadoop *58898* 2013-01-11 20:44 * >>> collector102.ngpipes.sac.ngmoco.com.1357936638713* >>> * >>> * >>> *Below are some helpful details.* >>> * >>> * >>> *I'm using apache-flume-1.4.0-SNAPSHOT-bin* >>> * >>> * >>> smehta@collector102:/opt$ ls -l flume >>> lrwxrwxrwx 1 root root 31 2012-12-14 00:44 flume -> >>> apache-flume-1.4.0-SNAPSHOT-bin >>> >>> *I also have the hadoop-core jar in my path* >>> >>> smehta@collector102:/opt/flume/lib$ ls -l hadoop-core-0.20.2-cdh3u2.jar >>> -rw-r--r-- 1 hadoop hadoop 3534499 2012-12-01 01:53 >>> hadoop-core-0.20.2-cdh3u2.jar >>> * >>> * >>> Everything is working well for me except the compression part. I'm not >>> quite sure what I'm missing here. So while I debug this, any ideas/help is >>> much appreciated. >>> >>> Thanks in advance, >>> Sagar >>> >> >> >
