Hi Hari,

Were you able to find if there's something wrong with the config ?


Regards
Mohit

On Thu, Sep 18, 2014 at 10:44 AM, Mohit Durgapal <[email protected]>
wrote:

> Hi Hari,
>
> This is our latest config:
>
>
>
> agent1tier1.sources = tcpsrc
> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
> avro-forward-ch02 avro-forward-ch02backup
>
> agent1tier1.channels = channelbucket01 channelbucket02
> agent1tier1.channels.channelbucket01.type = file
> agent1tier1.channels.channelbucket02.type = file
>
>
> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
> agent1tier1.sources.tcpsrc.type = syslogtcp
> agent1tier1.sources.tcpsrc.port = 5149
> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
> agent1tier1.sources.tcpsrc.interceptors=i1
> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>
>
>
>
> #################### INTERCEPTOR ##############################
> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
> bucket01,bucket02
>
>
>
> #################### END OF INTERCEPTOR ##############################
>
>
>
> ####################### SELECTOR ###########################
>
> agent1tier1.sources.tcpsrc.selector.type=multiplexing
> agent1tier1.sources.tcpsrc.selector.header = bucket
> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>
> ##################### END OF SELECTOR #############################
>
>
>
> #################### CHANNELS ##############################
>
> agent1tier1.channels.channelbucket01.checkpointDir =
> /home/flume/channelbucket01/file-channel/checkpoint
> agent1tier1.channels.channelbucket01.dataDirs =
> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>
> agent1tier1.channels.channelbucket02.checkpointDir =
> /home/flume/channelbucket02/file-channel/checkpoint
> agent1tier1.channels.channelbucket02.dataDirs =
> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>
>
> #################### CHANNELS ##############################
>
>
>
>
>
>
> ################## CHANNELS CAPACITY ############################
>
>
> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket01.capacity = 10000000
>
> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket02.capacity = 10000000
>
>
> ################## END OF CHANNELS CAPACITY ############################
>
>
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01.type = avro
> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch01.port = 10000
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01backup.type = avro
> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02.type = avro
> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch02.port = 19999
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02backup.type = avro
> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>
>
>
> agent1tier1.sinkgroups = grpch1
> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
> avro-forward-ch01backup
> agent1tier1.sinkgroups.grpch1.processor.type = failover
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 10
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
> = 2
> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>
>
>
> agent1tier1.sinkgroups = grpch2
> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
> avro-forward-ch02backup
> agent1tier1.sinkgroups.grpch2.processor.type = failover
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 11
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
> = 1
> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>
>
> Regards
> Mohit
>
> On Thu, Sep 18, 2014 at 10:38 AM, Hari Shreedharan <
> [email protected]> wrote:
>
>> Can you send your latest config?
>>
>> Thanks,
>> Hari
>>
>>
>> On Tue, Sep 16, 2014 at 6:01 AM, Mohit Durgapal <[email protected]>
>> wrote:
>>
>>>  We have a two stage topology in flume in which we are in the first
>>> tier adding headers based on hash value of a field in the event.
>>> The hashing logic is added in the interceptor in Tier 1 of flume
>>> topology which basically sets a header field. And then we use multiplexing
>>> to direct events to Tier 2  based on that header field through selector.
>>>  In the second tier we are storing the events locally using file_roll
>>> and storing the same events in hdfs also.
>>>
>>> Everything works fine when we are not using the failover sinks. When we
>>> add the failover sink configuration in the first tier our hashing logic
>>> gets overriden. That means even when all the machines in our Tier 2  are
>>> active and running, some events which were meant for flume agent1(based on
>>> hashing & multiplexing) go to agent 2.
>>>
>>> Also we are performing this test on three machines. One machine for Tier
>>> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2.
>>> In Tier 2 for flume agent on machine B, the machine C acts as the failover
>>> backup and for flume agent on machine C, the machine B acts as the failover
>>> backup.
>>>
>>> Any idea what could be wrong with this configuration?
>>>
>>> Below are the tier wise configurations:
>>>
>>> *Tier 1:*
>>>
>>> agent1tier1.sources = tcpsrc
>>> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
>>> avro-forward-ch02 avro-forward-ch02backup
>>>
>>> agent1tier1.channels = channelbucket01 channelbucket02
>>> agent1tier1.channels.channelbucket01.type = file
>>> agent1tier1.channels.channelbucket02.type = file
>>>
>>>
>>> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
>>> agent1tier1.sources.tcpsrc.type = syslogtcp
>>> agent1tier1.sources.tcpsrc.port = 5149
>>> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
>>> agent1tier1.sources.tcpsrc.interceptors=i1
>>> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>>>
>>>
>>>
>>>
>>> #################### INTERCEPTOR ##############################
>>> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>>>
>>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
>>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
>>> bucket01,bucket02
>>>
>>>
>>>
>>> #################### END OF INTERCEPTOR ##############################
>>>
>>>
>>>
>>> ####################### SELECTOR ###########################
>>>
>>> agent1tier1.sources.tcpsrc.selector.type=multiplexing
>>> agent1tier1.sources.tcpsrc.selector.header = bucket
>>> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
>>> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
>>> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>>>
>>> ##################### END OF SELECTOR #############################
>>>
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>> agent1tier1.channels.channelbucket01.checkpointDir =
>>> /home/flume/channelbucket01/file-channel/checkpoint
>>> agent1tier1.channels.channelbucket01.dataDirs =
>>> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>>>
>>> agent1tier1.channels.channelbucket02.checkpointDir =
>>> /home/flume/channelbucket02/file-channel/checkpoint
>>> agent1tier1.channels.channelbucket02.dataDirs =
>>> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>>
>>>
>>>
>>>
>>>
>>> ################## CHANNELS CAPACITY ############################
>>>
>>>
>>> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
>>> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
>>> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
>>> agent1tier1.channels.channelbucket01.capacity = 10000000
>>>
>>> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
>>> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
>>> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
>>> agent1tier1.channels.channelbucket02.capacity = 10000000
>>>
>>>
>>> ################## END OF CHANNELS CAPACITY ############################
>>>
>>>
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
>>> agent1tier1.sinks.avro-forward-ch01.type = avro
>>> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
>>> agent1tier1.sinks.avro-forward-ch01.port = 10000
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
>>> agent1tier1.sinks.avro-forward-ch01backup.type = avro
>>> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
>>> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
>>> agent1tier1.sinks.avro-forward-ch02.type = avro
>>> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
>>> agent1tier1.sinks.avro-forward-ch02.port = 19999
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
>>> agent1tier1.sinks.avro-forward-ch02backup.type = avro
>>> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
>>> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>>>
>>>
>>>
>>> agent1tier1.sinkgroups = grpch1
>>> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
>>> avro-forward-ch01backup
>>> agent1tier1.sinkgroups.grpch1.processor.type = failover
>>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
>>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
>>> = 10
>>> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>>>
>>>
>>>
>>> agent1tier1.sinkgroups = grpch2
>>> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
>>> avro-forward-ch02backup
>>> agent1tier1.sinkgroups.grpch2.processor.type = failover
>>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
>>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
>>> = 11
>>> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>>>
>>>
>>>
>>> *Tier 2:*
>>>
>>> tier2.sources  = avro-AppSrv-source
>>> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
>>> tier2.channels = channelconv channelimp channelclk channelrt
>>> channelhdfsrt channelhdfsdel
>>> tier2.channels.channelimp.type = file
>>> tier2.channels.channelconv.type = file
>>> tier2.channels.channelclk.type = file
>>> tier2.channels.channelrt.type = file
>>> tier2.channels.channelhdfsrt.type = file
>>> tier2.channels.channelhdfsdel.type = file
>>>
>>> # For each source, channel, and sink, set
>>> # standard properties.
>>> # properties of avro-AppSrv-source
>>> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
>>> channelclk channelrt channelhdfsrt channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.type = avro
>>> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
>>> tier2.sources.avro-AppSrv-source.port = 10000
>>>
>>>
>>>
>>>
>>>
>>>
>>> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
>>> tier2.sources.avro-AppSrv-source.selector.header = rectype
>>> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
>>> channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
>>> channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
>>> channelhdfsdel
>>>
>>> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
>>> channelhdfsrt
>>>
>>>
>>> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
>>>
>>>
>>>
>>> tier2.sinks.impsink.type = file_roll
>>> tier2.sinks.impsink.channel = channelimp
>>> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
>>> tier2.sinks.impsink.sink.rollInterval=60
>>>
>>> tier2.sinks.convsink.type = file_roll
>>> tier2.sinks.convsink.channel = channelconv
>>> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
>>> tier2.sinks.convsink.sink.rollInterval=60
>>>
>>> tier2.sinks.clksink.type = file_roll
>>> tier2.sinks.clksink.channel = channelclk
>>> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
>>> tier2.sinks.clksink.sink.rollInterval=60
>>>
>>>
>>> tier2.sinks.rtsink.type = file_roll
>>> tier2.sinks.rtsink.channel = channelrt
>>> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
>>> tier2.sinks.rtsink.sink.rollInterval=60
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>> tier2.channels.channelimp.checkpointDir =
>>> /home/flume/channelimp/file-channel/checkpoint
>>> tier2.channels.channelimp.dataDirs =
>>> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
>>>
>>>
>>> tier2.channels.channelclk.checkpointDir =
>>> /home/flume/channelclk/file-channel/checkpoint
>>> tier2.channels.channelclk.dataDirs =
>>> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
>>>
>>> tier2.channels.channelconv.checkpointDir =
>>> /home/flume/channelconv/file-channel/checkpoint
>>> tier2.channels.channelconv.dataDirs =
>>> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
>>>
>>> tier2.channels.channelrt.checkpointDir =
>>> /home/flume/channelrt/file-channel/checkpoint
>>> tier2.channels.channelrt.dataDirs =
>>> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
>>>
>>> tier2.channels.channelhdfsrt.checkpointDir =
>>> /home/flume/channelhdfsrt/file-channel/checkpoint
>>> tier2.channels.channelhdfsrt.dataDirs =
>>> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
>>>
>>> tier2.channels.channelhdfsdel.checkpointDir =
>>> /home/flume/channelhdfsdel/file-channel/checkpoint
>>> tier2.channels.channelhdfsdel.dataDirs =
>>> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
>>>
>>>
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>>
>>> tier2.sinks.hdfssinkrt.type = hdfs
>>> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
>>> tier2.sinks.hdfssinkrt.hdfs.path =
>>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
>>> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
>>> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
>>> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
>>> # Roll based on the block size only
>>> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
>>> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
>>> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
>>> # seconds to wait before closing the file.
>>> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
>>> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
>>> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
>>> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
>>> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
>>> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
>>>
>>>
>>> tier2.sinks.hdfssinkdel.type = hdfs
>>> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
>>> tier2.sinks.hdfssinkdel.hdfs.path =
>>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
>>> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
>>> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
>>> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
>>> # Roll based on the block size only
>>> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
>>> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
>>> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
>>> # seconds to wait before closing the file.
>>> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
>>> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
>>> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
>>> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
>>> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
>>> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
>>> #################### END OF SINKS ##############################
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to