Hi Hari, Were you able to find if there's something wrong with the config ?
Regards Mohit On Thu, Sep 18, 2014 at 10:44 AM, Mohit Durgapal <[email protected]> wrote: > Hi Hari, > > This is our latest config: > > > > agent1tier1.sources = tcpsrc > agent1tier1.sinks = avro-forward-ch01 avro-forward-ch01backup > avro-forward-ch02 avro-forward-ch02backup > > agent1tier1.channels = channelbucket01 channelbucket02 > agent1tier1.channels.channelbucket01.type = file > agent1tier1.channels.channelbucket02.type = file > > > agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02 > agent1tier1.sources.tcpsrc.type = syslogtcp > agent1tier1.sources.tcpsrc.port = 5149 > agent1tier1.sources.tcpsrc.host = xx.xxx.x.104 > agent1tier1.sources.tcpsrc.interceptors=i1 > agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp > > > > > #################### INTERCEPTOR ############################## > agent1tier1.sources.tcpsrc.interceptors=logsintercept > > agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder > agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets = > bucket01,bucket02 > > > > #################### END OF INTERCEPTOR ############################## > > > > ####################### SELECTOR ########################### > > agent1tier1.sources.tcpsrc.selector.type=multiplexing > agent1tier1.sources.tcpsrc.selector.header = bucket > agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01 > agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02 > agent1tier1.sources.tcpsrc.selector.default = channelbucket01 > > ##################### END OF SELECTOR ############################# > > > > #################### CHANNELS ############################## > > agent1tier1.channels.channelbucket01.checkpointDir = > /home/flume/channelbucket01/file-channel/checkpoint > agent1tier1.channels.channelbucket01.dataDirs = > /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data > > agent1tier1.channels.channelbucket02.checkpointDir = > /home/flume/channelbucket02/file-channel/checkpoint > agent1tier1.channels.channelbucket02.dataDirs = > /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data > > > #################### CHANNELS ############################## > > > > > > > ################## CHANNELS CAPACITY ############################ > > > agent1tier1.channels.channelbucket01.transactionCapacity = 1000000 > agent1tier1.channels.channelbucket01.checkpointInterval = 30000 > agent1tier1.channels.channelbucket01.maxFileSize = 2146435071 > agent1tier1.channels.channelbucket01.capacity = 10000000 > > agent1tier1.channels.channelbucket02.transactionCapacity = 1000000 > agent1tier1.channels.channelbucket02.checkpointInterval = 30000 > agent1tier1.channels.channelbucket02.maxFileSize = 2146435071 > agent1tier1.channels.channelbucket02.capacity = 10000000 > > > ################## END OF CHANNELS CAPACITY ############################ > > > > # avro sink properties > agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01 > agent1tier1.sinks.avro-forward-ch01.type = avro > agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106 > agent1tier1.sinks.avro-forward-ch01.port = 10000 > > # avro sink properties > agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01 > agent1tier1.sinks.avro-forward-ch01backup.type = avro > agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29 > agent1tier1.sinks.avro-forward-ch01backup.port = 19999 > > # avro sink properties > agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02 > agent1tier1.sinks.avro-forward-ch02.type = avro > agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29 > agent1tier1.sinks.avro-forward-ch02.port = 19999 > > # avro sink properties > agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02 > agent1tier1.sinks.avro-forward-ch02backup.type = avro > agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106 > agent1tier1.sinks.avro-forward-ch02backup.port = 10000 > > > > agent1tier1.sinkgroups = grpch1 > agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01 > avro-forward-ch01backup > agent1tier1.sinkgroups.grpch1.processor.type = failover > #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 10 > #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup > = 2 > agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000 > > > > agent1tier1.sinkgroups = grpch2 > agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02 > avro-forward-ch02backup > agent1tier1.sinkgroups.grpch2.processor.type = failover > #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 11 > #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup > = 1 > agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000 > > > Regards > Mohit > > On Thu, Sep 18, 2014 at 10:38 AM, Hari Shreedharan < > [email protected]> wrote: > >> Can you send your latest config? >> >> Thanks, >> Hari >> >> >> On Tue, Sep 16, 2014 at 6:01 AM, Mohit Durgapal <[email protected]> >> wrote: >> >>> We have a two stage topology in flume in which we are in the first >>> tier adding headers based on hash value of a field in the event. >>> The hashing logic is added in the interceptor in Tier 1 of flume >>> topology which basically sets a header field. And then we use multiplexing >>> to direct events to Tier 2 based on that header field through selector. >>> In the second tier we are storing the events locally using file_roll >>> and storing the same events in hdfs also. >>> >>> Everything works fine when we are not using the failover sinks. When we >>> add the failover sink configuration in the first tier our hashing logic >>> gets overriden. That means even when all the machines in our Tier 2 are >>> active and running, some events which were meant for flume agent1(based on >>> hashing & multiplexing) go to agent 2. >>> >>> Also we are performing this test on three machines. One machine for Tier >>> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2. >>> In Tier 2 for flume agent on machine B, the machine C acts as the failover >>> backup and for flume agent on machine C, the machine B acts as the failover >>> backup. >>> >>> Any idea what could be wrong with this configuration? >>> >>> Below are the tier wise configurations: >>> >>> *Tier 1:* >>> >>> agent1tier1.sources = tcpsrc >>> agent1tier1.sinks = avro-forward-ch01 avro-forward-ch01backup >>> avro-forward-ch02 avro-forward-ch02backup >>> >>> agent1tier1.channels = channelbucket01 channelbucket02 >>> agent1tier1.channels.channelbucket01.type = file >>> agent1tier1.channels.channelbucket02.type = file >>> >>> >>> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02 >>> agent1tier1.sources.tcpsrc.type = syslogtcp >>> agent1tier1.sources.tcpsrc.port = 5149 >>> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104 >>> agent1tier1.sources.tcpsrc.interceptors=i1 >>> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp >>> >>> >>> >>> >>> #################### INTERCEPTOR ############################## >>> agent1tier1.sources.tcpsrc.interceptors=logsintercept >>> >>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder >>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets = >>> bucket01,bucket02 >>> >>> >>> >>> #################### END OF INTERCEPTOR ############################## >>> >>> >>> >>> ####################### SELECTOR ########################### >>> >>> agent1tier1.sources.tcpsrc.selector.type=multiplexing >>> agent1tier1.sources.tcpsrc.selector.header = bucket >>> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01 >>> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02 >>> agent1tier1.sources.tcpsrc.selector.default = channelbucket01 >>> >>> ##################### END OF SELECTOR ############################# >>> >>> >>> >>> #################### CHANNELS ############################## >>> >>> agent1tier1.channels.channelbucket01.checkpointDir = >>> /home/flume/channelbucket01/file-channel/checkpoint >>> agent1tier1.channels.channelbucket01.dataDirs = >>> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data >>> >>> agent1tier1.channels.channelbucket02.checkpointDir = >>> /home/flume/channelbucket02/file-channel/checkpoint >>> agent1tier1.channels.channelbucket02.dataDirs = >>> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data >>> >>> >>> #################### CHANNELS ############################## >>> >>> >>> >>> >>> >>> >>> ################## CHANNELS CAPACITY ############################ >>> >>> >>> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000 >>> agent1tier1.channels.channelbucket01.checkpointInterval = 30000 >>> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071 >>> agent1tier1.channels.channelbucket01.capacity = 10000000 >>> >>> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000 >>> agent1tier1.channels.channelbucket02.checkpointInterval = 30000 >>> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071 >>> agent1tier1.channels.channelbucket02.capacity = 10000000 >>> >>> >>> ################## END OF CHANNELS CAPACITY ############################ >>> >>> >>> >>> # avro sink properties >>> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01 >>> agent1tier1.sinks.avro-forward-ch01.type = avro >>> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106 >>> agent1tier1.sinks.avro-forward-ch01.port = 10000 >>> >>> # avro sink properties >>> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01 >>> agent1tier1.sinks.avro-forward-ch01backup.type = avro >>> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29 >>> agent1tier1.sinks.avro-forward-ch01backup.port = 19999 >>> >>> # avro sink properties >>> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02 >>> agent1tier1.sinks.avro-forward-ch02.type = avro >>> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29 >>> agent1tier1.sinks.avro-forward-ch02.port = 19999 >>> >>> # avro sink properties >>> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02 >>> agent1tier1.sinks.avro-forward-ch02backup.type = avro >>> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106 >>> agent1tier1.sinks.avro-forward-ch02backup.port = 10000 >>> >>> >>> >>> agent1tier1.sinkgroups = grpch1 >>> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01 >>> avro-forward-ch01backup >>> agent1tier1.sinkgroups.grpch1.processor.type = failover >>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0 >>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup >>> = 10 >>> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000 >>> >>> >>> >>> agent1tier1.sinkgroups = grpch2 >>> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02 >>> avro-forward-ch02backup >>> agent1tier1.sinkgroups.grpch2.processor.type = failover >>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1 >>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup >>> = 11 >>> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000 >>> >>> >>> >>> *Tier 2:* >>> >>> tier2.sources = avro-AppSrv-source >>> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel >>> tier2.channels = channelconv channelimp channelclk channelrt >>> channelhdfsrt channelhdfsdel >>> tier2.channels.channelimp.type = file >>> tier2.channels.channelconv.type = file >>> tier2.channels.channelclk.type = file >>> tier2.channels.channelrt.type = file >>> tier2.channels.channelhdfsrt.type = file >>> tier2.channels.channelhdfsdel.type = file >>> >>> # For each source, channel, and sink, set >>> # standard properties. >>> # properties of avro-AppSrv-source >>> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp >>> channelclk channelrt channelhdfsrt channelhdfsdel >>> tier2.sources.avro-AppSrv-source.type = avro >>> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106 >>> tier2.sources.avro-AppSrv-source.port = 10000 >>> >>> >>> >>> >>> >>> >>> tier2.sources.avro-AppSrv-source.selector.type=multiplexing >>> tier2.sources.avro-AppSrv-source.selector.header = rectype >>> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp >>> channelhdfsdel >>> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk >>> channelhdfsdel >>> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv >>> channelhdfsdel >>> >>> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt >>> channelhdfsrt >>> >>> >>> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel >>> >>> >>> >>> tier2.sinks.impsink.type = file_roll >>> tier2.sinks.impsink.channel = channelimp >>> tier2.sinks.impsink.sink.directory = /var/log/flume/imp >>> tier2.sinks.impsink.sink.rollInterval=60 >>> >>> tier2.sinks.convsink.type = file_roll >>> tier2.sinks.convsink.channel = channelconv >>> tier2.sinks.convsink.sink.directory = /var/log/flume/conv >>> tier2.sinks.convsink.sink.rollInterval=60 >>> >>> tier2.sinks.clksink.type = file_roll >>> tier2.sinks.clksink.channel = channelclk >>> tier2.sinks.clksink.sink.directory = /var/log/flume/clk >>> tier2.sinks.clksink.sink.rollInterval=60 >>> >>> >>> tier2.sinks.rtsink.type = file_roll >>> tier2.sinks.rtsink.channel = channelrt >>> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt >>> tier2.sinks.rtsink.sink.rollInterval=60 >>> >>> >>> #################### CHANNELS ############################## >>> >>> tier2.channels.channelimp.checkpointDir = >>> /home/flume/channelimp/file-channel/checkpoint >>> tier2.channels.channelimp.dataDirs = >>> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data >>> >>> >>> tier2.channels.channelclk.checkpointDir = >>> /home/flume/channelclk/file-channel/checkpoint >>> tier2.channels.channelclk.dataDirs = >>> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data >>> >>> tier2.channels.channelconv.checkpointDir = >>> /home/flume/channelconv/file-channel/checkpoint >>> tier2.channels.channelconv.dataDirs = >>> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data >>> >>> tier2.channels.channelrt.checkpointDir = >>> /home/flume/channelrt/file-channel/checkpoint >>> tier2.channels.channelrt.dataDirs = >>> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data >>> >>> tier2.channels.channelhdfsrt.checkpointDir = >>> /home/flume/channelhdfsrt/file-channel/checkpoint >>> tier2.channels.channelhdfsrt.dataDirs = >>> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data >>> >>> tier2.channels.channelhdfsdel.checkpointDir = >>> /home/flume/channelhdfsdel/file-channel/checkpoint >>> tier2.channels.channelhdfsdel.dataDirs = >>> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data >>> >>> >>> >>> >>> #################### CHANNELS ############################## >>> >>> >>> tier2.sinks.hdfssinkrt.type = hdfs >>> tier2.sinks.hdfssinkrt.channel = channelhdfsrt >>> tier2.sinks.hdfssinkrt.hdfs.path = >>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H >>> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip >>> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream >>> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt >>> # Roll based on the block size only >>> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000 >>> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120 >>> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0 >>> # seconds to wait before closing the file. >>> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60 >>> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000 >>> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000 >>> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20 >>> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false >>> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000 >>> >>> >>> tier2.sinks.hdfssinkdel.type = hdfs >>> tier2.sinks.hdfssinkdel.channel = channelhdfsdel >>> tier2.sinks.hdfssinkdel.hdfs.path = >>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H >>> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip >>> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream >>> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel >>> # Roll based on the block size only >>> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000 >>> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120 >>> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0 >>> # seconds to wait before closing the file. >>> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60 >>> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000 >>> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000 >>> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20 >>> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false >>> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000 >>> #################### END OF SINKS ############################## >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >> >
