Just as a follow-up: it doesn't seem that flume recovers after the other writes stop. It just sits there, forever, doing nothing. No real CPU usage even.
On Thu, Sep 26, 2013 at 12:22 PM, Cameron Wellock < [email protected]> wrote: > Hi Paul, thanks for your thoughts. The sink does not complain--at all--and > there are no relevant errors in the logs on the datanodes. I haven't waited > to see if flume recovers after the other write stops, as I took the error > messages at face value and restarted flume. I will try that today, time > permitting, and I'll let you know what happens. > > Thanks again, > Cameron > > > On Thu, Sep 26, 2013 at 12:07 PM, Paul Chavez < > [email protected]> wrote: > >> Is the HDFS sink reporting any issues writing to the cluster? If you >> leave it alone or wait until the other application stops writing will flume >> recover?**** >> >> ** ** >> >> SpoolDir is a good source if the write performance to HDFS is variable as >> the files in the spool directory will just sit and wait until the flume >> channel has space again. Another option may be to add another HDFS sink or >> two pulling from the same channel, but from what you are saying this may >> not increase performance.**** >> >> ** ** >> >> Hope that helps,**** >> >> Paul Chavez**** >> >> ** ** >> >> ** ** >> >> *From:* Cameron Wellock [mailto:[email protected]] >> *Sent:* Thursday, September 26, 2013 8:37 AM >> *To:* [email protected] >> *Subject:* Unable to put batch on required channel**** >> >> ** ** >> >> Hello world,**** >> >> ** ** >> >> I've been trying to set up a test instance of flume and have been stymied >> by recurring failures. I'm trying to use a single flume agent moving about >> 200G of data from a spooldir into a very small hadoop cluster (3 nodes). If >> flume is the only thing writing to HDFS, everything works fine, but as soon >> as another application starts writing data into the cluster HDFS slows down >> and flume barfs with an "unable to put batch on required channel" exception. >> **** >> >> ** ** >> >> I have tried all kinds of configuration changes, to no avail. I have >> tried memory channels, file channels, small batch sizes (down to 50), large >> batch sizes (up to 20000), increasing timeouts, increasing channel capacity >> (up to 150 million), you name it. Sooner or later (usually 5-10 minutes >> after restart) flume comes to a halt. This is especially vexing considering >> that it's copying from a file to a file--there are no realtime requirements >> that might reasonably lead to a full channel in other circumstances. >> Anybody have any advice? Insights? Wild guesses? Outright lies?**** >> >> ** ** >> >> Below are two exceptions from the log, one from a memory channel >> configuration, one from a file channel configuration, and below that is the >> most recent configuration file used. Absolutely any suggestions would be >> appreciated.**** >> >> ** ** >> >> Thanks,**** >> >> Cameron**** >> >> ** ** >> >> ** ** >> >> 25 Sep 2013 21:05:12,262 ERROR [pool-5-thread-1] >> (org.apache.flume.source.SpoolDirectorySource$Spool**** >> >> DirectoryRunnable.run:195) - FATAL: Spool Directory source r1: { >> spoolDir: /var/nrelate/flume-spool**** >> >> }: Uncaught exception in SpoolDirectorySource thread. Restart or >> reconfigure Flume to continue proc**** >> >> essing.**** >> >> org.apache.flume.ChannelException: Unable to put batch on required >> channel: org.apache.flume.channel**** >> >> .MemoryChannel{name: c1}**** >> >> at >> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) >> **** >> >> at >> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou >> **** >> >> rce.java:189)**** >> >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)** >> ** >> >> at >> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) >> **** >> >> at >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)**** >> >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled >> **** >> >> ThreadPoolExecutor.java:165)**** >> >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP >> **** >> >> oolExecutor.java:267)**** >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> **** >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> **** >> >> at java.lang.Thread.run(Thread.java:679)**** >> >> Caused by: org.apache.flume.ChannelException: Space for commit to queue >> couldn't be acquired Sinks a**** >> >> re likely not keeping up with sources, or the buffer size is too tight*** >> * >> >> at >> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128) >> **** >> >> at >> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java: >> **** >> >> 151)**** >> >> at >> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) >> **** >> >> ... 9 more**** >> >> ** ** >> >> ** ** >> >> 25 Sep 2013 22:18:37,672 ERROR [pool-5-thread-1] >> (org.apache.flume.source.SpoolDirectorySource$Spool**** >> >> DirectoryRunnable.run:195) - FATAL: Spool Directory source r1: { >> spoolDir: /var/nrelate/flume-spool**** >> >> }: Uncaught exception in SpoolDirectorySource thread. Restart or >> reconfigure Flume to continue proc**** >> >> essing.**** >> >> org.apache.flume.ChannelException: Unable to put batch on required >> channel: FileChannel c1 { dataDir**** >> >> s: [/var/lib/flume-ng/.flume/file-channel/data] }**** >> >> at >> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) >> **** >> >> at >> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou >> **** >> >> rce.java:189)**** >> >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)** >> ** >> >> at >> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) >> **** >> >> at >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)**** >> >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled >> **** >> >> ThreadPoolExecutor.java:165)**** >> >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP >> **** >> >> oolExecutor.java:267)**** >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> **** >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> **** >> >> at java.lang.Thread.run(Thread.java:679)**** >> >> Caused by: org.apache.flume.ChannelException: The channel has reached >> it's capacity. This might be t**** >> >> he result of a sink on the channel having too low of batch size, a >> downstream system running slower **** >> >> than normal, or that the channel capacity is just too low. [channel=c1]** >> ** >> >> at >> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:46 >> **** >> >> 8)**** >> >> at >> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93) >> **** >> >> at >> org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80) >> **** >> >> at >> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189) >> **** >> >> ... 9 more**** >> >> ** ** >> >> ** ** >> >> ** ** >> >> ** ** >> >> # define the pipeline parts >> --------------------------------------------------------------**** >> >> ** ** >> >> agent.sources = r1**** >> >> agent.sinks = k1**** >> >> agent.channels = c1**** >> >> ** ** >> >> agent.sources.r1.channels = c1**** >> >> agent.sinks.k1.channel = c1**** >> >> ** ** >> >> # the main source, a spooldir >> ------------------------------------------------------------**** >> >> ** ** >> >> agent.sources.r1.type = spooldir**** >> >> agent.sources.r1.spoolDir = /var/intheworld/flume-spool**** >> >> agent.sources.r1.batchSize = 10000**** >> >> agent.sources.r1.deserializer.maxLineLength = 10000**** >> >> agent.sources.r1.interceptors = i1 i2**** >> >> ** ** >> >> # parse out the timestamp and add to header**** >> >> agent.sources.r1.interceptors.i1.type = regex_extractor**** >> >> agent.sources.r1.interceptors.i1.regex = ^.*\\"ts\\":(\\d+).*$**** >> >> agent.sources.r1.interceptors.i1.serializers = s1**** >> >> agent.sources.r1.interceptors.i1.serializers.s1.name = timestamp**** >> >> ** ** >> >> # also set host (hostname doesn't work properly, so set explicitly)**** >> >> agent.sources.r1.interceptors.i2.type = static**** >> >> agent.sources.r1.interceptors.i2.key = host**** >> >> agent.sources.r1.interceptors.i2.value = Ess003726**** >> >> ** ** >> >> # the sink, HDFS >> ------------------------------------------------------------------------- >> **** >> >> ** ** >> >> agent.sinks.k1.type = hdfs**** >> >> agent.sinks.k1.hdfs.path = hdfs:// >> a.host.in.the.world.com/events/raw/%Y-%m-%d<http://a.host.in.the.world.com/events/raw/%25Y-%25m-%25d> >> **** >> >> agent.sinks.k1.hdfs.filePrefix = %{host}**** >> >> agent.sinks.k1.hdfs.rollInterval = 0**** >> >> agent.sinks.k1.hdfs.rollSize = 0**** >> >> agent.sinks.k1.hdfs.rollCount = 0**** >> >> agent.sinks.k1.hdfs.batchSize = 10000**** >> >> agent.sinks.k1.hdfs.txnEventMax = 10000**** >> >> agent.sinks.k1.hdfs.idleTimeout = 900**** >> >> agent.sinks.k1.hdfs.callTimeout = 300000**** >> >> agent.sinks.k1.hdfs.fileType = DataStream**** >> >> agent.sinks.k1.hdfs.writeFormat = Text**** >> >> ** ** >> >> # the channel >> ---------------------------------------------------------------------------- >> **** >> >> ** ** >> >> agent.channels.c1.type = file**** >> >> agent.channels.c1.capacity = 150000000**** >> >> agent.channels.c1.transactionCapacity = 10000**** >> >> agent.channels.c1.write-timeout = 360**** >> > >
