Since there was no response to this, I set up a separate ticket at
https://issues.apache.org/jira/browse/FLUME-1541 and implemented it as a
SinkSelector for the LoadBalancingSinkProcessor.
Review can be found at https://reviews.apache.org/r/6939/
Chris: if you're interested you may want to give this a poke, see if it
fulfills your needs. The only change in configuration needed is to
change the selector type from "round_robin" to "round_robin_backoff"
On 09/04/2012 07:39 PM, Juhani Connolly wrote:
I'm thinking of working on this(adding backoff semantics to the load
balancing processor)
The ticket FLUME-1488 however refers to the load balancing rpc
client(or is it just poorly worded/unclear?). If it is in fact a
separate ticket I'll file one for this
Anyway, I was interested in hearing thoughts on approach. I'd have
liked to do it within the framework of the LoadBalancingSinkProcessor
by adding a new Selector, however as it is now, it the processor
provides no feedback to the selectors about whether sinks are working
or not, so this can't work.
This leaves two choices: write a new SinkProcessor or modify the
SinkSelector interface to give it a couple of callbacks that the
processor calls to inform the selector of trouble. This shouldn't
really be a problem even if people have written their own selectors so
long as they are extending AbstractSinkSelector which can stub the
callbacks.
Thoughts?
On 08/18/2012 02:01 AM, Arvind Prabhakar wrote:
Hi,
FYI - the load balancing sink processor does support simple failover
semantics. The way it works is that if a sink is down, it will
proceed to the next sink in the group until all sinks are exhausted.
The failover sink processor on the other hand does complex failure
handling and back-off such as blacklisting sinks that repeatedly fail
etc. The issue [1] tracks enhancing this processor to support backoff
semantics.
The one issue with your configuration that I could spot by a quick
glance is that you are adding your active sinks to both the sink
groups. This does not really work and the configuration subsystem
simply flags the second inclusion as a problem and ignores it. By
design, a sink can either be on its own or in one explicit sink group.
[1] https://issues.apache.org/jira/browse/FLUME-1488
Regards,
Arvind Prabhakar
On Fri, Aug 17, 2012 at 8:59 AM, Chris Neal <[email protected]
<mailto:[email protected]>> wrote:
Hi all.
The User Guide talks about the various types of Sink Processors,
but doesn't say whether they can be aggregated together. A
Failover Processor that moves between 1..n sinks is great, as is
a Load Balancer Processor that moves between 1..n sinks, but what
is the best would be an agent that can utilize both a Failover
Processor AND a Load Balancer Processor!
I've created a configuration which I believe supports this, and
the Agent starts up and processes events, but I wanted to ping
this group to make sure that this configuration is really doing
what I think it is doing behind the scenes.
Comments?
# Define the sources, sinks, and channels for the agent
agent.sources = avro-instance_1-source avro-instance_2-source
agent.channels = memory-agent-channel
agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
agent.sinkgroups = failover-sink-group lb-sink-group
# Bind sources to channels
agent.sources.avro-instance_1-source.channels = memory-agent-channel
agent.sources.avro-instance_2-source.channels = memory-agent-channel
# Define sink group for failover
agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
avro-hdfs_2-sink
agent.sinkgroups.failover-sink-group.processor.type = failover
agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink
= 5
agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink
= 10
agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000
# Define sink group for load balancing
agent.sinkgroups = lb-sink-group
agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
agent.sinkgroups.group1.processor.type = load_balance
agent.sinkgroups.group1.processor.selector = round_robin
# Bind sinks to channels
agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel
# avro-instance_1-source properties
agent.sources.avro-instance_1-source.type = exec
agent.sources.avro-instance_1-source.command = tail -F
/somedir/Trans.log
agent.sources.avro-instance_1-source.restart = true
agent.sources.avro-instance_1-source.batchSize = 100
# avro-instance_2-source properties
agent.sources.avro-instance_2-source.type = exec
agent.sources.avro-instance_2-source.command = tail -F
/somedir/UDXMLTrans.log
agent.sources.avro-instance_2-source.restart = true
agent.sources.avro-instance_2-source.batchSize = 100
# avro-hdfs_1-sink properties
agent.sinks.avro-hdfs_1-sink.type = avro
agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
<http://hdfshost1.domin.com>
agent.sinks.avro-hdfs_1-sink.port = 10000
# avro-hdfs_2-sink properties
agent.sinks.avro-hdfs_2-sink.type = avro
agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
<http://hdfshost2.domain.com>
agent.sinks.avro-hdfs_2-sink.port = 10000
# memory-agent-channel properties
agent.channels.memory-agent-channel.type = memory
agent.channels.memory-agent-channel.capacity = 20000
agent.channels.memory-agent-channel.transactionCapacity = 100
Thanks!