Hi, Below is my flume config and I am attempting to get Load Balancing sink
group to LB across multiple machines. I see only 2 threads created for the
entire sink group when using load balancing sink group and see the below
message in the logs(and I see no throughput on draining events from the
channel). On the other hand, if I comment out the sink group definition from
the flume config and thus use the DefaultSinkProcessor, I see a lot more
threads and events draining a lot faster. I suspect this is a problem with my
config, but I could not find anything obvious. Could anyone here help?
Flume log output:flume.log:16 Apr 2015 17:18:07,549 INFO [main]
(org.apache.flume.node.Application.startAllComponents:138) - Starting new
configuration:{ sourceRunners:{netcat=EventDrivenSourceRunner: {
source:org.apache.flume.source.NetcatSource{name:netcat,state:IDLE} },
spool=EventDrivenSourceRunner: {
source:org.apache.flume.source.SpoolDirectorySource{name:spool,state:IDLE} }}
sinkRunners:{mainSinks=SinkRunner: {
policy:org.apache.flume.sink.LoadBalancingSinkProcessor@15f66cff counterGroup:{
name:null counters:{} } }, replaySinks=SinkRunner: {
policy:org.apache.flume.sink.LoadBalancingSinkProcessor@656de49c counterGroup:{
name:null counters:{} } }}
channels:{mainChannel=org.apache.flume.channel.MemoryChannel{name:
mainChannel}, replayChannel=org.apache.flume.channel.MemoryChannel{name:
replayChannel}} }
Flume config:agent1.channels.mainChannel.type =
MEMORYagent1.channels.mainChannel.capacity =
150000agent1.channels.mainChannel.transactionCapacity = 10000
agent1.channels.replayChannel.type =
MEMORYagent1.channels.replayChannel.capacity =
50000agent1.channels.replayChannel.transactionCapacity = 5000
# netcat sourceagent1.sources.netcat.channels =
mainChannelagent1.sources.netcat.type= netcatagent1.sources.netcat.bind =
127.0.0.1agent1.sources.netcat.port =
44444agent1.sources.netcat.ack-every-event =
falseagent1.sources.netcat.max-line-length = 8192
# spool directory sourceagent1.sources.spool.channels =
replayChannelagent1.sources.spool.type =
spooldiragent1.sources.spool.bufferMaxLineLength =
8192agent1.sources.spool.bufferMaxLines = 1000agent1.sources.spool.batchSize =
1000agent1.sources.spool.spoolDir =
/br/agent_aud/replayagent1.sources.spool.inputCharset = ISO-8859-1#Label the
event as a replayed eventagent1.sources.spool.interceptors =
staticInterceptoragent1.sources.spool.interceptors.staticInterceptor.type =
staticagent1.sources.spool.interceptors.staticInterceptor.key =
tagent1.sources.spool.interceptors.staticInterceptor.value = r
agent1.sinks.avroMainSink1.type = avroagent1.sinks.avroMainSink1.channel =
mainChannelagent1.sinks.avroMainSink1.hostname =
flumefs-v01-00a.bento.btrll.comagent1.sinks.avroMainSink1.port =
4545agent1.sinks.avroMainSink1.connect-timeout =
30000agent1.sinks.avroMainSink1.request-timeout =
20000agent1.sinks.avroMainSink1.batch-size = 200
agent1.sinks.avroReplaySink1.type = avroagent1.sinks.avroReplaySink1.channel =
replayChannelagent1.sinks.avroReplaySink1.hostname =
flumefs-v01-00a.bento.btrll.comagent1.sinks.avroReplaySink1.port =
4545agent1.sinks.avroReplaySink1.connect-timeout =
300000agent1.sinks.avroReplaySink1.batch-size = 2000
agent1.sinks.avroMainSink2.type = avroagent1.sinks.avroMainSink2.channel =
mainChannelagent1.sinks.avroMainSink2.hostname =
flumefs-v01-00a.bento.btrll.comagent1.sinks.avroMainSink2.port =
4546agent1.sinks.avroMainSink2.connect-timeout =
30000agent1.sinks.avroMainSink2.request-timeout =
20000agent1.sinks.avroMainSink2.batch-size = 200
agent1.sinks.avroReplaySink2.type = avroagent1.sinks.avroReplaySink2.channel =
replayChannelagent1.sinks.avroReplaySink2.hostname =
flumefs-v01-00a.bento.btrll.comagent1.sinks.avroReplaySink2.port =
4546agent1.sinks.avroReplaySink2.connect-timeout =
300000agent1.sinks.avroReplaySink2.batch-size = 2000
agent1.sinks.avroMainSink3.type = avroagent1.sinks.avroMainSink3.channel =
mainChannelagent1.sinks.avroMainSink3.hostname =
flumefs-v01-00a.bento.btrll.comagent1.sinks.avroMainSink3.port =
4547agent1.sinks.avroMainSink3.connect-timeout =
30000agent1.sinks.avroMainSink3.request-timeout =
20000agent1.sinks.avroMainSink3.batch-size = 200
agent1.sinks.avroReplaySink3.type = avroagent1.sinks.avroReplaySink3.channel =
replayChannelagent1.sinks.avroReplaySink3.hostname =
flumefs-v01-00a.bento.btrll.comagent1.sinks.avroReplaySink3.port =
4547agent1.sinks.avroReplaySink3.connect-timeout =
300000agent1.sinks.avroReplaySink3.batch-size = 2000
agent1.sinks.avroMainSink4.type = avroagent1.sinks.avroMainSink4.channel =
mainChannelagent1.sinks.avroMainSink4.hostname =
flumefs-v01-00a.bento.btrll.comagent1.sinks.avroMainSink4.port =
4548agent1.sinks.avroMainSink4.connect-timeout =
30000agent1.sinks.avroMainSink4.request-timeout =
20000agent1.sinks.avroMainSink4.batch-size = 200
agent1.sinks.avroReplaySink4.type = avroagent1.sinks.avroReplaySink4.channel =
replayChannelagent1.sinks.avroReplaySink4.hostname =
flumefs-v01-00a.bento.btrll.comagent1.sinks.avroReplaySink4.port =
4548agent1.sinks.avroReplaySink4.connect-timeout =
300000agent1.sinks.avroReplaySink4.batch-size = 2000
agent1.sinkgroups.mainSinks.sinks = avroMainSink1 avroMainSink2 avroMainSink3
avroMainSink4agent1.sinkgroups.mainSinks.processor.type =
load_balanceagent1.sinkgroups.mainSinks.processor.selector =
randomagent1.sinkgroups.mainSinks.processor.maxTimeOut =
1000agent1.sinkgroups.mainSinks.processor.backoff = true
agent1.sinkgroups.replaySinks.sinks = avroReplaySink1 avroReplaySink2
avroReplaySink3 avroReplaySink4agent1.sinkgroups.replaySinks.processor.type =
load_balanceagent1.sinkgroups.replaySinks.processor.selector =
randomagent1.sinkgroups.replaySinks.processor.maxTimeOut =
1000agent1.sinkgroups.replaySinks.processor.backoff = true
agent1.channels = mainChannel replayChannelagent1.sinkgroups = mainSinks
replaySinksagent1.sources = netcat spoolagent1.sinks = avroMainSink1
avroReplaySink1 avroMainSink2 avroReplaySink2 avroMainSink3 avroReplaySink3
avroMainSink4 avroReplaySink4