Hi Paul, thanks for your thoughts. The sink does not complain--at all--and
there are no relevant errors in the logs on the datanodes. I haven't waited
to see if flume recovers after the other write stops, as I took the error
messages at face value and restarted flume. I will try that today, time
permitting, and I'll let you know what happens.

Thanks again,
Cameron


On Thu, Sep 26, 2013 at 12:07 PM, Paul Chavez <
[email protected]> wrote:

> Is the HDFS sink reporting any issues writing to the cluster? If you leave
> it alone or wait until the other application stops writing will flume
> recover?****
>
> ** **
>
> SpoolDir is a good source if the write performance to HDFS is variable as
> the files in the spool directory will just sit and wait until the flume
> channel has space again. Another option may be to add another HDFS sink or
> two pulling from the same channel, but from what you are saying this may
> not increase performance.****
>
> ** **
>
> Hope that helps,****
>
> Paul Chavez****
>
> ** **
>
> ** **
>
> *From:* Cameron Wellock [mailto:[email protected]]
> *Sent:* Thursday, September 26, 2013 8:37 AM
> *To:* [email protected]
> *Subject:* Unable to put batch on required channel****
>
> ** **
>
> Hello world,****
>
> ** **
>
> I've been trying to set up a test instance of flume and have been stymied
> by recurring failures. I'm trying to use a single flume agent moving about
> 200G of data from a spooldir into a very small hadoop cluster (3 nodes). If
> flume is the only thing writing to HDFS, everything works fine, but as soon
> as another application starts writing data into the cluster HDFS slows down
> and flume barfs with an "unable to put batch on required channel" exception.
> ****
>
> ** **
>
> I have tried all kinds of configuration changes, to no avail. I have tried
> memory channels, file channels, small batch sizes (down to 50), large batch
> sizes (up to 20000), increasing timeouts, increasing channel capacity (up
> to 150 million), you name it. Sooner or later (usually 5-10 minutes after
> restart) flume comes to a halt. This is especially vexing considering that
> it's copying from a file to a file--there are no realtime requirements that
> might reasonably lead to a full channel in other circumstances. Anybody
> have any advice? Insights? Wild guesses? Outright lies?****
>
> ** **
>
> Below are two exceptions from the log, one from a memory channel
> configuration, one from a file channel configuration, and below that is the
> most recent configuration file used. Absolutely any suggestions would be
> appreciated.****
>
> ** **
>
> Thanks,****
>
> Cameron****
>
> ** **
>
> ** **
>
> 25 Sep 2013 21:05:12,262 ERROR [pool-5-thread-1]
> (org.apache.flume.source.SpoolDirectorySource$Spool****
>
> DirectoryRunnable.run:195)  - FATAL: Spool Directory source r1: {
> spoolDir: /var/nrelate/flume-spool****
>
>  }: Uncaught exception in SpoolDirectorySource thread. Restart or
> reconfigure Flume to continue proc****
>
> essing.****
>
> org.apache.flume.ChannelException: Unable to put batch on required
> channel: org.apache.flume.channel****
>
> .MemoryChannel{name: c1}****
>
>             at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> ****
>
>             at
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou
> ****
>
> rce.java:189)****
>
>             at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)***
> *
>
>             at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
> ****
>
>             at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)****
>
>             at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled
> ****
>
> ThreadPoolExecutor.java:165)****
>
>             at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP
> ****
>
> oolExecutor.java:267)****
>
>             at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> ****
>
>             at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> ****
>
>             at java.lang.Thread.run(Thread.java:679)****
>
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks a****
>
> re likely not keeping up with sources, or the buffer size is too tight****
>
>             at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
> ****
>
>             at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:
> ****
>
> 151)****
>
>             at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> ****
>
>             ... 9 more****
>
> ** **
>
> ** **
>
> 25 Sep 2013 22:18:37,672 ERROR [pool-5-thread-1]
> (org.apache.flume.source.SpoolDirectorySource$Spool****
>
> DirectoryRunnable.run:195)  - FATAL: Spool Directory source r1: {
> spoolDir: /var/nrelate/flume-spool****
>
>  }: Uncaught exception in SpoolDirectorySource thread. Restart or
> reconfigure Flume to continue proc****
>
> essing.****
>
> org.apache.flume.ChannelException: Unable to put batch on required
> channel: FileChannel c1 { dataDir****
>
> s: [/var/lib/flume-ng/.flume/file-channel/data] }****
>
>             at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> ****
>
>             at
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou
> ****
>
> rce.java:189)****
>
>             at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)***
> *
>
>             at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
> ****
>
>             at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)****
>
>             at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled
> ****
>
> ThreadPoolExecutor.java:165)****
>
>             at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP
> ****
>
> oolExecutor.java:267)****
>
>             at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> ****
>
>             at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> ****
>
>             at java.lang.Thread.run(Thread.java:679)****
>
> Caused by: org.apache.flume.ChannelException: The channel has reached it's
> capacity. This might be t****
>
> he result of a sink on the channel having too low of batch size, a
> downstream system running slower ****
>
> than normal, or that the channel capacity is just too low. [channel=c1]***
> *
>
>             at
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:46
> ****
>
> 8)****
>
>             at
> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
> ****
>
>             at
> org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
> ****
>
>             at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
> ****
>
>             ... 9 more****
>
> ** **
>
> ** **
>
> ** **
>
> ** **
>
> # define the pipeline parts
> --------------------------------------------------------------****
>
> ** **
>
> agent.sources = r1****
>
> agent.sinks = k1****
>
> agent.channels = c1****
>
> ** **
>
> agent.sources.r1.channels = c1****
>
> agent.sinks.k1.channel = c1****
>
> ** **
>
> # the main source, a spooldir
> ------------------------------------------------------------****
>
> ** **
>
> agent.sources.r1.type = spooldir****
>
> agent.sources.r1.spoolDir = /var/intheworld/flume-spool****
>
> agent.sources.r1.batchSize = 10000****
>
> agent.sources.r1.deserializer.maxLineLength = 10000****
>
> agent.sources.r1.interceptors = i1 i2****
>
> ** **
>
> # parse out the timestamp and add to header****
>
> agent.sources.r1.interceptors.i1.type = regex_extractor****
>
> agent.sources.r1.interceptors.i1.regex = ^.*\\"ts\\":(\\d+).*$****
>
> agent.sources.r1.interceptors.i1.serializers = s1****
>
> agent.sources.r1.interceptors.i1.serializers.s1.name = timestamp****
>
> ** **
>
> # also set host (hostname doesn't work properly, so set explicitly)****
>
> agent.sources.r1.interceptors.i2.type = static****
>
> agent.sources.r1.interceptors.i2.key = host****
>
> agent.sources.r1.interceptors.i2.value = Ess003726****
>
> ** **
>
> # the sink, HDFS
> -------------------------------------------------------------------------*
> ***
>
> ** **
>
> agent.sinks.k1.type = hdfs****
>
> agent.sinks.k1.hdfs.path = hdfs://
> a.host.in.the.world.com/events/raw/%Y-%m-%d<http://a.host.in.the.world.com/events/raw/%25Y-%25m-%25d>
> ****
>
> agent.sinks.k1.hdfs.filePrefix = %{host}****
>
> agent.sinks.k1.hdfs.rollInterval = 0****
>
> agent.sinks.k1.hdfs.rollSize = 0****
>
> agent.sinks.k1.hdfs.rollCount = 0****
>
> agent.sinks.k1.hdfs.batchSize = 10000****
>
> agent.sinks.k1.hdfs.txnEventMax = 10000****
>
> agent.sinks.k1.hdfs.idleTimeout = 900****
>
> agent.sinks.k1.hdfs.callTimeout = 300000****
>
> agent.sinks.k1.hdfs.fileType = DataStream****
>
> agent.sinks.k1.hdfs.writeFormat = Text****
>
> ** **
>
> # the channel
> ----------------------------------------------------------------------------
> ****
>
> ** **
>
> agent.channels.c1.type = file****
>
> agent.channels.c1.capacity = 150000000****
>
> agent.channels.c1.transactionCapacity = 10000****
>
> agent.channels.c1.write-timeout = 360****
>

Reply via email to