unsubscribe
On Fri, Sep 27, 2013 at 10:55 AM, Cameron Wellock < [email protected]> wrote: > Final update, in case anyone ever has a similar problem: increasing the > transactionCapacity to a low multiple of the batch size (say 5x batch size) > seems to have fixed the problem, at least for the moment. > > Cameron > > > On Thu, Sep 26, 2013 at 12:22 PM, Cameron Wellock < > [email protected]> wrote: > >> Hi Paul, thanks for your thoughts. The sink does not complain--at >> all--and there are no relevant errors in the logs on the datanodes. I >> haven't waited to see if flume recovers after the other write stops, as I >> took the error messages at face value and restarted flume. I will try that >> today, time permitting, and I'll let you know what happens. >> >> Thanks again, >> Cameron >> >> >> On Thu, Sep 26, 2013 at 12:07 PM, Paul Chavez < >> [email protected]> wrote: >> >>> Is the HDFS sink reporting any issues writing to the cluster? If you >>> leave it alone or wait until the other application stops writing will flume >>> recover?**** >>> >>> ** ** >>> >>> SpoolDir is a good source if the write performance to HDFS is variable >>> as the files in the spool directory will just sit and wait until the flume >>> channel has space again. Another option may be to add another HDFS sink or >>> two pulling from the same channel, but from what you are saying this may >>> not increase performance.**** >>> >>> ** ** >>> >>> Hope that helps,**** >>> >>> Paul Chavez**** >>> >>> ** ** >>> >>> ** ** >>> >>> *From:* Cameron Wellock [mailto:[email protected]] >>> *Sent:* Thursday, September 26, 2013 8:37 AM >>> *To:* [email protected] >>> *Subject:* Unable to put batch on required channel**** >>> >>> ** ** >>> >>> Hello world,**** >>> >>> ** ** >>> >>> I've been trying to set up a test instance of flume and have been >>> stymied by recurring failures. I'm trying to use a single flume agent >>> moving about 200G of data from a spooldir into a very small hadoop cluster >>> (3 nodes). If flume is the only thing writing to HDFS, everything works >>> fine, but as soon as another application starts writing data into the >>> cluster HDFS slows down and flume barfs with an "unable to put batch on >>> required channel" exception.**** >>> >>> ** ** >>> >>> I have tried all kinds of configuration changes, to no avail. I have >>> tried memory channels, file channels, small batch sizes (down to 50), large >>> batch sizes (up to 20000), increasing timeouts, increasing channel capacity >>> (up to 150 million), you name it. Sooner or later (usually 5-10 minutes >>> after restart) flume comes to a halt. This is especially vexing considering >>> that it's copying from a file to a file--there are no realtime requirements >>> that might reasonably lead to a full channel in other circumstances. >>> Anybody have any advice? Insights? Wild guesses? Outright lies?**** >>> >>> ** ** >>> >>> Below are two exceptions from the log, one from a memory channel >>> configuration, one from a file channel configuration, and below that is the >>> most recent configuration file used. Absolutely any suggestions would be >>> appreciated.**** >>> >>> ** ** >>> >>> Thanks,**** >>> >>> Cameron**** >>> >>> ** ** >>> >>> ** ** >>> >>> 25 Sep 2013 21:05:12,262 ERROR [pool-5-thread-1] >>> (org.apache.flume.source.SpoolDirectorySource$Spool**** >>> >>> DirectoryRunnable.run:195) - FATAL: Spool Directory source r1: { >>> spoolDir: /var/nrelate/flume-spool**** >>> >>> }: Uncaught exception in SpoolDirectorySource thread. Restart or >>> reconfigure Flume to continue proc**** >>> >>> essing.**** >>> >>> org.apache.flume.ChannelException: Unable to put batch on required >>> channel: org.apache.flume.channel**** >>> >>> .MemoryChannel{name: c1}**** >>> >>> at >>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) >>> **** >>> >>> at >>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou >>> **** >>> >>> rce.java:189)**** >>> >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)* >>> *** >>> >>> at >>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) >>> **** >>> >>> at >>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)**** >>> >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled >>> **** >>> >>> ThreadPoolExecutor.java:165)**** >>> >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP >>> **** >>> >>> oolExecutor.java:267)**** >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >>> **** >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >>> **** >>> >>> at java.lang.Thread.run(Thread.java:679)**** >>> >>> Caused by: org.apache.flume.ChannelException: Space for commit to queue >>> couldn't be acquired Sinks a**** >>> >>> re likely not keeping up with sources, or the buffer size is too tight** >>> ** >>> >>> at >>> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128) >>> **** >>> >>> at >>> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java: >>> **** >>> >>> 151)**** >>> >>> at >>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) >>> **** >>> >>> ... 9 more**** >>> >>> ** ** >>> >>> ** ** >>> >>> 25 Sep 2013 22:18:37,672 ERROR [pool-5-thread-1] >>> (org.apache.flume.source.SpoolDirectorySource$Spool**** >>> >>> DirectoryRunnable.run:195) - FATAL: Spool Directory source r1: { >>> spoolDir: /var/nrelate/flume-spool**** >>> >>> }: Uncaught exception in SpoolDirectorySource thread. Restart or >>> reconfigure Flume to continue proc**** >>> >>> essing.**** >>> >>> org.apache.flume.ChannelException: Unable to put batch on required >>> channel: FileChannel c1 { dataDir**** >>> >>> s: [/var/lib/flume-ng/.flume/file-channel/data] }**** >>> >>> at >>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) >>> **** >>> >>> at >>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou >>> **** >>> >>> rce.java:189)**** >>> >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)* >>> *** >>> >>> at >>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) >>> **** >>> >>> at >>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)**** >>> >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled >>> **** >>> >>> ThreadPoolExecutor.java:165)**** >>> >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP >>> **** >>> >>> oolExecutor.java:267)**** >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >>> **** >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >>> **** >>> >>> at java.lang.Thread.run(Thread.java:679)**** >>> >>> Caused by: org.apache.flume.ChannelException: The channel has reached >>> it's capacity. This might be t**** >>> >>> he result of a sink on the channel having too low of batch size, a >>> downstream system running slower **** >>> >>> than normal, or that the channel capacity is just too low. [channel=c1]* >>> *** >>> >>> at >>> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:46 >>> **** >>> >>> 8)**** >>> >>> at >>> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93) >>> **** >>> >>> at >>> org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80) >>> **** >>> >>> at >>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189) >>> **** >>> >>> ... 9 more**** >>> >>> ** ** >>> >>> ** ** >>> >>> ** ** >>> >>> ** ** >>> >>> # define the pipeline parts >>> --------------------------------------------------------------**** >>> >>> ** ** >>> >>> agent.sources = r1**** >>> >>> agent.sinks = k1**** >>> >>> agent.channels = c1**** >>> >>> ** ** >>> >>> agent.sources.r1.channels = c1**** >>> >>> agent.sinks.k1.channel = c1**** >>> >>> ** ** >>> >>> # the main source, a spooldir >>> ------------------------------------------------------------**** >>> >>> ** ** >>> >>> agent.sources.r1.type = spooldir**** >>> >>> agent.sources.r1.spoolDir = /var/intheworld/flume-spool**** >>> >>> agent.sources.r1.batchSize = 10000**** >>> >>> agent.sources.r1.deserializer.maxLineLength = 10000**** >>> >>> agent.sources.r1.interceptors = i1 i2**** >>> >>> ** ** >>> >>> # parse out the timestamp and add to header**** >>> >>> agent.sources.r1.interceptors.i1.type = regex_extractor**** >>> >>> agent.sources.r1.interceptors.i1.regex = ^.*\\"ts\\":(\\d+).*$**** >>> >>> agent.sources.r1.interceptors.i1.serializers = s1**** >>> >>> agent.sources.r1.interceptors.i1.serializers.s1.name = timestamp**** >>> >>> ** ** >>> >>> # also set host (hostname doesn't work properly, so set explicitly)**** >>> >>> agent.sources.r1.interceptors.i2.type = static**** >>> >>> agent.sources.r1.interceptors.i2.key = host**** >>> >>> agent.sources.r1.interceptors.i2.value = Ess003726**** >>> >>> ** ** >>> >>> # the sink, HDFS >>> ------------------------------------------------------------------------- >>> **** >>> >>> ** ** >>> >>> agent.sinks.k1.type = hdfs**** >>> >>> agent.sinks.k1.hdfs.path = hdfs:// >>> a.host.in.the.world.com/events/raw/%Y-%m-%d<http://a.host.in.the.world.com/events/raw/%25Y-%25m-%25d> >>> **** >>> >>> agent.sinks.k1.hdfs.filePrefix = %{host}**** >>> >>> agent.sinks.k1.hdfs.rollInterval = 0**** >>> >>> agent.sinks.k1.hdfs.rollSize = 0**** >>> >>> agent.sinks.k1.hdfs.rollCount = 0**** >>> >>> agent.sinks.k1.hdfs.batchSize = 10000**** >>> >>> agent.sinks.k1.hdfs.txnEventMax = 10000**** >>> >>> agent.sinks.k1.hdfs.idleTimeout = 900**** >>> >>> agent.sinks.k1.hdfs.callTimeout = 300000**** >>> >>> agent.sinks.k1.hdfs.fileType = DataStream**** >>> >>> agent.sinks.k1.hdfs.writeFormat = Text**** >>> >>> ** ** >>> >>> # the channel >>> ---------------------------------------------------------------------------- >>> **** >>> >>> ** ** >>> >>> agent.channels.c1.type = file**** >>> >>> agent.channels.c1.capacity = 150000000**** >>> >>> agent.channels.c1.transactionCapacity = 10000**** >>> >>> agent.channels.c1.write-timeout = 360**** >>> >> >> >
