Hi Cameron, I had a look at your logs and what would be happening:

- Looks like your first collector either isn't sinking events, or it's just too slow at it. - By the look of the logs the second one *is* delivering events. The failover processor doesn't actually say anything if nothing is wrong. Perhaps adding some logging to FailoverSinkProcessor.moveActiveToDeadAndGetNext() would make this more user friendly. Could you check if stuff was actually delivered by collector 2? - The json output for avro-sink2 looks like it is successfully sending data. What's the output like for the second collector? - I'd recommend setting up ganglia to verify the throughput on each component to see if there's an imbalance. Those JSON stats quite frankly look a bit odd. It doesn't look like the channels are full, which would indicate that the batch size is greater than the transaction capacity. But this doesn't appear to be the case in the configuration, unless we have a bug expecting batchsize < transactionCap rather than <=. It seems some batch sizes aren't specified(so using default. I'm pretty sure none of the defaults are > 1000 but you might want to check that just to be safe

So if this isn't resolved: Check to see if collector 2 is receiving events, if not, try chancing transaction size to be greater than the batch size. If there's still a problem, thread dumps from all three agents would help a lot, and if you can, have a look at the stats from ganglia.

On 10/17/2012 04:53 AM, Cameron Gandevia wrote:
Hey

Thanks for the reply. I think the problem is in the Spooling Directory Source. I am not sure it recovers properly, once the failure happens I don't see its thread pool running in the dump. I have added a question to the code review which may be related but I don't understand how the sources are restarted during failures to know if its the problem for sure. I will keep looking and post anything I find.

Thanks

On Tue, Oct 16, 2012 at 12:45 PM, Brock Noland <[email protected] <mailto:[email protected]>> wrote:

    With the configuration below, I was able to kill collector1, see
    collector2 take over and reverse.

    Note that I had to decrease the capacity of the channels significantly
    because I was using a smaller heap size. Before decreasing the
    capacity, a few threads in the local_agent ran of memory and then
    behaved like you explained.

    Brock

    #
    # Properties of memoryChannel
    #
    local_agent.channels.memoryChannel-1.type = memory
    local_agent.channels.memoryChannel-1.capacity = 100000
    local_agent.channels.memoryChannel-1.transactionCapacity = 1000

    collector_agent_1.channels.memoryChannel-1.type = memory
    collector_agent_1.channels.memoryChannel-1.capacity = 100000
    collector_agent_1.channels.memoryChannel-1.transactionCapacity = 1000

    collector_agent_1.channels.memoryChannel-2.type = memory
    collector_agent_1.channels.memoryChannel-2.capacity = 100000
    collector_agent_1.channels.memoryChannel-2.transactionCapacity = 1000

    collector_agent_2.channels.memoryChannel-1.type = memory
    collector_agent_2.channels.memoryChannel-1.capacity = 100000
    collector_agent_2.channels.memoryChannel-1.transactionCapacity = 1000

    collector_agent_2.channels.memoryChannel-2.type = memory
    collector_agent_2.channels.memoryChannel-2.capacity = 100000
    collector_agent_2.channels.memoryChannel-2.transactionCapacity = 1000

    #
    # Properties for spooling directory source
    #
    local_agent.sources.spooldir-1.type = seq
    local_agent.sources.spooldir-1.channels = memoryChannel-1

    #
    # Properties for the avro sink 1 agent to collector 1
    #
    local_agent.sinks.avroSink-1.type = avro
    local_agent.sinks.avroSink-1.hostname = 127.0.0.1
    local_agent.sinks.avroSink-1.port = 4545
    local_agent.sinks.avroSink-1.channel = memoryChannel-1

    #
    # Properties for the avro sink agent to collector 2
    #
    local_agent.sinks.avroSink-2.type = avro
    local_agent.sinks.avroSink-2.hostname = 127.0.0.1
    local_agent.sinks.avroSink-2.port = 4546
    local_agent.sinks.avroSink-2.channel = memoryChannel-1

    #
    # Properties for the avro source collector 1
    #
    collector_agent_1.sources.avroSource-1.type = avro
    collector_agent_1.sources.avroSource-1.bind = 127.0.0.1
    collector_agent_1.sources.avroSource-1.port = 4545
    collector_agent_1.sources.avroSource-1.channels = memoryChannel-1
    memoryChannel-2

    #
    # Properties for the avro source collector 2
    #
    collector_agent_2.sources.avroSource-2.type = avro
    collector_agent_2.sources.avroSource-2.bind = 127.0.0.1
    collector_agent_2.sources.avroSource-2.port = 4546
    collector_agent_2.sources.avroSource-2.channels = memoryChannel-1
    memoryChannel-2

    # End points for collector 1

    # ElasticSearch endpoint collector 1

    collector_agent_1.sinks.elastic-search-sink-1.type = null
    collector_agent_1.sinks.elastic-search-sink-1.channel =
    memoryChannel-1

    # HDFS endpoint collector 1

    collector_agent_1.sinks.sink1.type = null
    collector_agent_1.sinks.sink1.channel = memoryChannel-2

    # ElasticSearch endpoint collector 2

    collector_agent_2.sinks.elastic-search-sink-1.type = null
    collector_agent_2.sinks.elastic-search-sink-1.channel =
    memoryChannel-1

    # HDFS endpoint collector 2

    collector_agent_2.sinks.sink1.type = null
    collector_agent_2.sinks.sink1.channel = memoryChannel-2

    # Specify priorities for the sinks on the agent

    local_agent.sinkgroups.ha.sinks = avroSink-1 avroSink-2
    local_agent.sinkgroups.ha.processor.type = failover
    local_agent.sinkgroups.ha.priority.avroSink-1 = 2
    local_agent.sinkgroups.ha.priority.avroSink-2 = 1

    # Wire the source agents up

    local_agent.sources = spooldir-1
    local_agent.sinks = avroSink-1 avroSink-2
    local_agent.sinkgroups = ha
    local_agent.channels = memoryChannel-1

    # Wire the collector agents up

    collector_agent_1.sources = avroSource-1
    collector_agent_1.sinks = elastic-search-sink-1 sink1
    collector_agent_1.channels = memoryChannel-1 memoryChannel-2

    collector_agent_2.sources = avroSource-2
    collector_agent_2.sinks = elastic-search-sink-1 sink1
    collector_agent_2.channels = memoryChannel-1 memoryChannel-2




--
Thanks

Cameron Gandevia

Reply via email to