Can also send the flume agent logs? Did you check the contents of the files?
-- Hari Shreedharan On Thursday, February 28, 2013 at 2:43 PM, Roshan Naik wrote: > would you be able to you verify if the same problem can be reproduced by > using the memory channel instead in a test setup ? > > > On Wed, Feb 27, 2013 at 11:37 AM, Sagar Mehta <[email protected] > (mailto:[email protected])> wrote: > > Hi Guys, > > > > I'm using Flume-Ng and it is working pretty well except for a weird > > situation which I observed lately. In essence I'm using an exec source for > > doing tail -F on a logfile and using two HDFS sinks with a File channel. > > > > However I have observed that when the source [ logfile of a jetty based > > collector] is idle - that is no new events are pushed to the logFile, > > FlumeNg seems to replay the same set of events. > > > > For example collector110 received no events for 2 subsequent hours and > > below are the corresponding Flume written files at the HDFS sink > > > > hadoop@jobtracker301:/home/hadoop/sagar$ hls > > /ngpipes-raw-logs/2013-02-27/1400/collector110* > > -rw-r--r-- 3 hadoop supergroup 441 2013-02-27 14:20 > > /ngpipes-raw-logs/2013-02-27/1400/collector110.ngpipes.sac.ngmoco.com.1361974853210.gz > > -rw-r--r-- 3 hadoop supergroup 441 2013-02-27 14:50 > > /ngpipes-raw-logs/2013-02-27/1400/collector110.ngpipes.sac.ngmoco.com.1361976653432.gz > > > > hadoop@jobtracker301:/home/hadoop/sagar$ hls > > /ngpipes-raw-logs/2013-02-27/1500/collector110* > > -rw-r--r-- 3 hadoop supergroup 441 2013-02-27 15:20 > > /ngpipes-raw-logs/2013-02-27/1500/collector110.ngpipes.sac.ngmoco.com.1361978454123.gz > > -rw-r--r-- 3 hadoop supergroup 441 2013-02-27 15:50 > > /ngpipes-raw-logs/2013-02-27/1500/collector110.ngpipes.sac.ngmoco.com.1361980254338.gz > > > > hadoop@jobtracker301:/home/hadoop/sagar$ md5sum * > > c7360ef5c8deaee3ce9f4c92e9d9be63 > > collector110.ngpipes.sac.ngmoco.com.1361974853210.gz > > c7360ef5c8deaee3ce9f4c92e9d9be63 > > collector110.ngpipes.sac.ngmoco.com.1361976653432.gz > > c7360ef5c8deaee3ce9f4c92e9d9be63 > > collector110.ngpipes.sac.ngmoco.com.1361978454123.gz > > c7360ef5c8deaee3ce9f4c92e9d9be63 > > collector110.ngpipes.sac.ngmoco.com.1361980254338.gz > > > > > > As you can see above the md5sums match. > > > > I'm using a File channel which has checkpoints, so I'm not sure what is > > going on. Btw looks like the difference in timestamps of the two replays is > > exactly 30 mins. > > > > Is this a known bug or am I missing something? > > > > Below is my Flume config file > > > > smehta@collector110:/opt/flume/conf$ cat hdfs.conf > > # An hdfs sink to write events to the hdfs on the test cluster > > # A memory based channel to connect the above source and sink > > > > # Name the components on this agent > > collector110.sources = source1 > > collector110.sinks = sink1 sink2 > > collector110.channels = channel1 channel2 > > > > # Configure the source > > collector110.sources.source1.type = exec > > collector110.sources.source1.command = tail -F /opt/jetty/logFile.log > > > > # Configure the interceptors > > collector110.sources.source1.interceptors = TimestampInterceptor > > HostInterceptor > > > > # We use the Timestamp interceptor to get timestamps of when flume receives > > events > > # This is used for figuring out the bucket to which an event goes > > collector110.sources.source1.interceptors.TimestampInterceptor.type = > > timestamp > > > > # We use the Host interceptor to populate the host header with the fully > > qualified domain name of the collector. > > # That way we know which file in the sink respresents which collector. > > collector110.sources.source1.interceptors.HostInterceptor.type = > > org.apache.flume.interceptor.HostInterceptor$Builder > > collector110.sources.source1.interceptors.HostInterceptor.preserveExisting > > = false > > collector110.sources.source1.interceptors.HostInterceptor.useIP = false > > collector110.sources.source1.interceptors.HostInterceptor.hostHeader = host > > > > > > # Configure the sink > > > > collector110.sinks.sink1.type = hdfs > > > > # Configure the bucketing > > collector110.sinks.sink1.hdfs.path=hdfs://namenode3001.ngpipes.milp.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00 > > > > (http://namenode3001.ngpipes.milp.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00) > > > > # Prefix the file with the source so that we know where the events in the > > file came from > > collector110.sinks.sink1.hdfs.filePrefix = %{host} > > > > # We roll the flume output file based on time interval - currently every 5 > > minutes > > collector110.sinks.sink1.hdfs.rollSize = 0 > > collector110.sinks.sink1.hdfs.rollCount = 0 > > collector110.sinks.sink1.hdfs.rollInterval = 300 > > > > #gzip compression related settings > > collector110.sinks.sink1.hdfs.codeC = gzip > > collector110.sinks.sink1.hdfs.fileType = CompressedStream > > collector110.sinks.sink1.hdfs.fileSuffix = .gz > > > > # Configure the sink > > > > collector110.sinks.sink2.type = hdfs > > > > # Configure the bucketing > > collector110.sinks.sink2.hdfs.path=hdfs://namenode5001.ngpipes.sac.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00 > > > > (http://namenode5001.ngpipes.sac.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00) > > > > # Prefix the file with the source so that we know where the events in the > > file came from > > collector110.sinks.sink2.hdfs.filePrefix = %{host} > > > > # We roll the flume output file based on time interval - currently every 5 > > minutes > > collector110.sinks.sink2.hdfs.rollSize = 0 > > collector110.sinks.sink2.hdfs.rollCount = 0 > > collector110.sinks.sink2.hdfs.rollInterval = 300 > > > > #gzip compression related settings > > collector110.sinks.sink2.hdfs.codeC = gzip > > collector110.sinks.sink2.hdfs.fileType = CompressedStream > > collector110.sinks.sink2.hdfs.fileSuffix = .gz > > > > # Configure the channel that connects the source to the sink > > > > # Use a channel which buffers events in filesystem > > collector110.channels.channel1.type = file > > collector110.channels.channel1.checkpointDir = > > /data/flume_data/channel1/checkpoint > > collector110.channels.channel1.dataDirs = /data/flume_data/channel1/data > > > > # Use a channel which buffers events in filesystem > > collector110.channels.channel2.type = file > > collector110.channels.channel2.checkpointDir = > > /data/flume_data/channel2/checkpoint > > collector110.channels.channel2.dataDirs = /data/flume_data/channel2/data > > > > # Bind the source and sink to the channel configured above > > collector110.sources.source1.channels = channel1 channel2 > > collector110.sinks.sink1.channel = channel1 > > collector110.sinks.sink2.channel = channel2 > > > > Sagar > > > > >
