Hello,

I'm currently testing the spillable memory channel.
The idea is to have a fast channel coupled with a file channel in order to spool logs when our hadoop cluster is unreachable.

To give you the whole architecture :

source-server -> tcp/syslog -> flume-spool (memory + spooling) -> avro -> flume-destination (hadoop-cluster)

My test scenario is :
- send 100 000 logs from the source server
- stop the flume-destination during this sending
- wait a few seconds in order to spool on the flume-spool server
- start the flume-destination
- check if all the logs have been received on the flume-destination server.

The problem is that when i stop the destination server, the source server stops sending *completely*. Is this the normal behaviour ? Should it spool on the flume server instead of stopping the source ?

On my flume logs i can see the following exception :
05 Mar 2015 10:22:13,828 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.AbstractRpcSink.createConnection:217) - Rpc sink sin1: Building RpcClient with hostname: 10.115.77.6, port: 55555 05 Mar 2015 10:22:13,828 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.AvroSink.initializeRpcClient:126) - Attempting to create Avro Rpc client. 05 Mar 2015 10:22:13,862 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.api.NettyAvroRpcClient.configure:634) - Using default maxIOWorkers 05 Mar 2015 10:22:13,869 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
java.lang.NullPointerException
at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:590) at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:394) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)

Please find enclosed the flume configurations (spooling server and destination server).

Regards,

--
Smaine Kahlouch - Engineer, Research & Engineering
Arkena | T: +33 1 5868 6196
27 Blvd Hippolyte Marquès, 94200 Ivry-sur-Seine, France
arkena.com

agent.sources = r1
agent.sinks = k1
agent.channels = c1

agent.sources.r1.type = avro
agent.sources.r1.bind = 0.0.0.0
agent.sources.r1.port = 55555

agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /var/flumelogs
agent.sinks.k1.batchSize = 200000

agent.channels.c1.type = memory
agent.channels.c1.capacity = 3500000
agent.channels.c1.transactionCapacity = 100000

agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
agent.sources = src1
agent.sources.src1.type = syslogtcp
agent.sources.src1.channels = chan1
agent.sources.src1.host = 0.0.0.0
agent.sources.src1.port = 20514
agent.sources.src1.eventSize = 200000

agent.channels = chan1
agent.channels.chan1.type = SPILLABLEMEMORY
agent.channels.chan1.memoryCapacity = 100000
#agent.channels.chan1.overflowCapacity =  # Default : 100000000
#agent.channels.chan1.byteCapacity = # Defaults 80% of the available JVM memory 
(-Xmx)
agent.channels.chan1.avgEventSize = 100000
agent.channels.chan1.checkpointDir=/mnt/flumeCheckpoint
agent.channels.chan1.checkpointInterval=300000
agent.channels.chan1.useDualCheckpoints=true
agent.channels.chan1.backupCheckpointDir=/mnt/diske/backupCheckpoint
agent.channels.chan1.dataDirs = 
/mnt/diske,/mnt/diskf,/mnt/diskg,/mnt/diskh,/mnt/diski,/mnt/diskj,/mnt/diskk

agent.sinks = sin1
agent.sinks.sin1.channel = chan1
agent.sinks.sin1.type = avro
agent.sinks.sin1.hostname = 10.115.77.6
agent.sinks.sin1.port = 55555
agent.sinks.sin1.batchSize = 200000

Reply via email to