I believe you are suffering from this bug: https://issues.apache.org/jira/browse/FLUME-2778 So when it's running is able to keep up but when the channel has more than 4 events queued, the Sink tried to extract 100 (default batch size) and you get that error.
Regards, Gonzalo On 13 October 2015 at 08:12, Balthasar Schopman < [email protected]> wrote: > Hi, > > I'm creating a proof-of-concept of a Flume agent that'll buffer events and > stops consuming events from the source when the sink is unavailable. Only > when the sink is available again, the buffered events should be processed > and then the source restarts consumption. > > For this I've created a simple agent, which reads from a SpoolDir and > writes to a file. To simulate that the sink service is down, I change file > permissions so Flume can't write to it. Then I start Flume some events are > buffered in the memory channel and it stops consuming events when the > channel capacity is full, as expected. As soon as the file becomes > writeable, the sink is able to process the events and Flume recovers. > However, that only works when the transaction capacity is not exceeded. As > soon as the transaction capacity is exceeded, Flume never recovers and > keeps writing the following error: > > 2015-10-02 14:52:51,940 > (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] > Unable to > deliver event. Exception follows. > org.apache.flume.EventDeliveryException: Failed to process transaction > at > org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flume.ChannelException: Take list for > MemoryTransaction, > capacity 4 full, consider committing more frequently, increasing > capacity, or > increasing thread count > at > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96) > at > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) > at > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) > at > org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191) > ... 3 more > > As soon as the number of events buffered in memory exceed the transaction > capacity (4) this error occurs. I don't understand why, because the > batchSize of the fileout is 1, so it should take out the events one by one. > > This is the config I'm using: > > agent.sources = spool-src > agent.channels = mem-channel > agent.sinks = fileout > > agent.sources.spool-src.channels = mem-channel > agent.sources.spool-src.type = spooldir > agent.sources.spool-src.spoolDir = /tmp/flume-spool > agent.sources.spool-src.batchSize = 1 > > agent.channels.mem-channel.type = memory > agent.channels.mem-channel.capacity = 10 > agent.channels.mem-channel.transactionCapacity = 4 > > agent.sinks.fileout.channel = mem-channel > agent.sinks.fileout.type = file_roll > agent.sinks.fileout.sink.directory = /tmp/flume-output > agent.sinks.fileout.sink.rollInterval = 0 > agent.sinks.fileout.batchSize = 1 > > I've tested this config with different values for the channel capacity & > transaction capacity (e.g., 3 and 3), but haven't found a config where > Flume is able to recover after the channel capacity is full. Any ideas on > how to achieve this? > > -- > Kind regards, > Balthasar Schopman > LeaseWeb CDN Innovation Engineer > > Kind regards, > > Balthasar Schopman > Software Developer > LeaseWeb Technologies B.V. > > T: +31 20 316 0232 > M: > E: [email protected] > W: http://www.leaseweb.com > > Luttenbergweg 8, 1101 EC Amsterdam, Netherlands > > >
