Can also send the flume agent logs? Did you check the contents of the files?  

-- 
Hari Shreedharan


On Thursday, February 28, 2013 at 2:43 PM, Roshan Naik wrote:

> would you be able to you verify if the same problem can be reproduced by 
> using the memory channel instead in a test setup ?
> 
> 
> On Wed, Feb 27, 2013 at 11:37 AM, Sagar Mehta <[email protected] 
> (mailto:[email protected])> wrote:
> > Hi Guys,
> > 
> > I'm using Flume-Ng and it is working pretty well except for a weird 
> > situation which I observed lately. In essence I'm using an exec source for 
> > doing  tail -F on a logfile and using two HDFS sinks with a File channel. 
> > 
> > However I have observed that when the source [ logfile of a jetty based 
> > collector] is idle - that is no new events are pushed to the logFile, 
> > FlumeNg seems to replay the same set of events.
> > 
> > For example collector110 received no events for 2 subsequent hours and 
> > below are the corresponding Flume written files at the HDFS sink
> > 
> > hadoop@jobtracker301:/home/hadoop/sagar$ hls 
> > /ngpipes-raw-logs/2013-02-27/1400/collector110* 
> > -rw-r--r--   3 hadoop supergroup        441 2013-02-27 14:20 
> > /ngpipes-raw-logs/2013-02-27/1400/collector110.ngpipes.sac.ngmoco.com.1361974853210.gz
> > -rw-r--r--   3 hadoop supergroup        441 2013-02-27 14:50 
> > /ngpipes-raw-logs/2013-02-27/1400/collector110.ngpipes.sac.ngmoco.com.1361976653432.gz
> > 
> > hadoop@jobtracker301:/home/hadoop/sagar$ hls 
> > /ngpipes-raw-logs/2013-02-27/1500/collector110*
> > -rw-r--r--   3 hadoop supergroup        441 2013-02-27 15:20 
> > /ngpipes-raw-logs/2013-02-27/1500/collector110.ngpipes.sac.ngmoco.com.1361978454123.gz
> > -rw-r--r--   3 hadoop supergroup        441 2013-02-27 15:50 
> > /ngpipes-raw-logs/2013-02-27/1500/collector110.ngpipes.sac.ngmoco.com.1361980254338.gz
> > 
> > hadoop@jobtracker301:/home/hadoop/sagar$ md5sum * 
> > c7360ef5c8deaee3ce9f4c92e9d9be63  
> > collector110.ngpipes.sac.ngmoco.com.1361974853210.gz
> > c7360ef5c8deaee3ce9f4c92e9d9be63  
> > collector110.ngpipes.sac.ngmoco.com.1361976653432.gz
> > c7360ef5c8deaee3ce9f4c92e9d9be63  
> > collector110.ngpipes.sac.ngmoco.com.1361978454123.gz
> > c7360ef5c8deaee3ce9f4c92e9d9be63  
> > collector110.ngpipes.sac.ngmoco.com.1361980254338.gz
> > 
> > 
> > As you can see above the md5sums match.
> > 
> > I'm using a File channel which has checkpoints, so I'm not sure what is 
> > going on. Btw looks like the difference in timestamps of the two replays is 
> > exactly 30 mins. 
> > 
> > Is this a known bug or am I missing something?
> > 
> > Below is my Flume config file
> > 
> > smehta@collector110:/opt/flume/conf$ cat hdfs.conf 
> > # An hdfs sink to write events to the hdfs on the test cluster
> > # A memory based channel to connect the above source and sink
> > 
> > # Name the components on this agent
> > collector110.sources = source1
> > collector110.sinks = sink1 sink2
> > collector110.channels = channel1 channel2
> > 
> > # Configure the source
> > collector110.sources.source1.type = exec
> > collector110.sources.source1.command = tail -F /opt/jetty/logFile.log
> > 
> > # Configure the interceptors
> > collector110.sources.source1.interceptors = TimestampInterceptor 
> > HostInterceptor
> > 
> > # We use the Timestamp interceptor to get timestamps of when flume receives 
> > events 
> > # This is used for figuring out the bucket to which an event goes
> > collector110.sources.source1.interceptors.TimestampInterceptor.type = 
> > timestamp
> > 
> > # We use the Host interceptor to populate the host header with the fully 
> > qualified domain name of the collector. 
> > # That way we know which file in the sink respresents which collector.
> > collector110.sources.source1.interceptors.HostInterceptor.type = 
> > org.apache.flume.interceptor.HostInterceptor$Builder
> > collector110.sources.source1.interceptors.HostInterceptor.preserveExisting 
> > = false
> > collector110.sources.source1.interceptors.HostInterceptor.useIP = false
> > collector110.sources.source1.interceptors.HostInterceptor.hostHeader = host
> > 
> > 
> > # Configure the sink 
> > 
> > collector110.sinks.sink1.type = hdfs
> > 
> > # Configure the bucketing
> > collector110.sinks.sink1.hdfs.path=hdfs://namenode3001.ngpipes.milp.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00
> >  
> > (http://namenode3001.ngpipes.milp.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00)
> > 
> > # Prefix the file with the source so that we know where the events in the 
> > file came from
> > collector110.sinks.sink1.hdfs.filePrefix = %{host}
> > 
> > # We roll the flume output file based on time interval - currently every 5 
> > minutes 
> > collector110.sinks.sink1.hdfs.rollSize = 0
> > collector110.sinks.sink1.hdfs.rollCount = 0
> > collector110.sinks.sink1.hdfs.rollInterval = 300
> > 
> > #gzip compression related settings 
> > collector110.sinks.sink1.hdfs.codeC = gzip
> > collector110.sinks.sink1.hdfs.fileType = CompressedStream
> > collector110.sinks.sink1.hdfs.fileSuffix = .gz
> > 
> > # Configure the sink 
> > 
> > collector110.sinks.sink2.type = hdfs
> > 
> > # Configure the bucketing
> > collector110.sinks.sink2.hdfs.path=hdfs://namenode5001.ngpipes.sac.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00
> >  
> > (http://namenode5001.ngpipes.sac.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00)
> > 
> > # Prefix the file with the source so that we know where the events in the 
> > file came from
> > collector110.sinks.sink2.hdfs.filePrefix = %{host}
> > 
> > # We roll the flume output file based on time interval - currently every 5 
> > minutes 
> > collector110.sinks.sink2.hdfs.rollSize = 0
> > collector110.sinks.sink2.hdfs.rollCount = 0
> > collector110.sinks.sink2.hdfs.rollInterval = 300
> > 
> > #gzip compression related settings 
> > collector110.sinks.sink2.hdfs.codeC = gzip
> > collector110.sinks.sink2.hdfs.fileType = CompressedStream
> > collector110.sinks.sink2.hdfs.fileSuffix = .gz
> > 
> > # Configure the channel that connects the source to the sink 
> > 
> > # Use a channel which buffers events in filesystem
> > collector110.channels.channel1.type = file
> > collector110.channels.channel1.checkpointDir = 
> > /data/flume_data/channel1/checkpoint
> > collector110.channels.channel1.dataDirs = /data/flume_data/channel1/data
> > 
> > # Use a channel which buffers events in filesystem
> > collector110.channels.channel2.type = file
> > collector110.channels.channel2.checkpointDir = 
> > /data/flume_data/channel2/checkpoint
> > collector110.channels.channel2.dataDirs = /data/flume_data/channel2/data
> > 
> > # Bind the source and sink to the channel configured above
> > collector110.sources.source1.channels = channel1 channel2
> > collector110.sinks.sink1.channel = channel1
> > collector110.sinks.sink2.channel = channel2
> >  
> > Sagar
> > 
> > 
> 

Reply via email to