I would start by logging performance metrics from your flume agent which will 
let you determine which component is falling behind. It's likely a sink though, 
so the first thing you can do is just add an extra sink where you think it's 
falling behind. You can always add additional sinks pointing at the same 
channel and they will both attempt to take as fast as they can and since it's 
transactional they won't ever attempt to send the same data (barring 
transaction failures).

I'm not sure if just adding another avro sink pointing at the same host is a 
viable method to improve performance, I would think just bumping up the 
transaction capacity would help there. I know there's a recent thread with some 
research on the fileChannel being the actual bottleneck though so it may work.

In my case I just added another hdfs sink to each channel on my last hop and 
that was enough to clear the bottleneck.

Good luck,
Paul Chavez

From: Wang, Yongkun | Yongkun | BDD [mailto:[email protected]]
Sent: Thursday, August 22, 2013 10:27 PM
To: <[email protected]>
Subject: Re: sleep() in script doesn't work when called by exec Source

If it happened at the last hop in your test, it could possibly happen at the 
first hop.
Maybe the network is not fast in my test. I got "ChannelException: The channel 
has reached it's capacity." either on agent side (first hop) or collector side 
(last hop sinking to hadoop).

My configuration of agent:

agent1.sources = spd1
agent1.sources.spd1.type = spooldir
agent1.sources.spd1.spoolDir = /log/flume-ng/agent1/spooldir/spd1
agent1.sources.spd1.deserializer.maxLineLength = 8192
agent1.sources.spd1.channels = file1

agent1.channels = file1
agent1.channels.file1.type = file
agent1.channels.file1.checkpointDir = /log/flume-ng/agent1/checkpoint
agent1.channels.file1.dataDirs = /log/flume-ng/agent1/data
agent1.channels.file1.capacity = 2000000
agent1.channels.file1.transactionCapacity = 100

agent1.sinks = avro1
agent1.sinks.avro1.type = avro
agent1.sinks.avro1.channel = file1
agent1.sinks.avro1.hostname = remote_host
agent1.sinks.avro1.port = 33333

Thanks.

Best Regards,
Yongkun Wang

On 2013/08/21, at 1:15, Paul Chavez wrote:


Yes, I am curious what you mean as well. When testing I had dropped a few 15GB 
files in the spoolDir and while they processed slowly they did complete. In 
fact, my only issue with that test was the last hop HDFS sinks couldn't keep up 
and I had to add a couple more to keep upstream channels from filling up.

Thanks,
Paul


From: Brock Noland [mailto:[email protected]]
Sent: Tuesday, August 20, 2013 7:59 AM
To: [email protected]<mailto:[email protected]>
Subject: Re: sleep() in script doesn't work when called by exec Source

Hi,

Can you share the details of this?  It shouldn't die with large files.

On Tue, Aug 20, 2013 at 3:43 AM, Wang, Yongkun | Yongkun | BDD 
<[email protected]<mailto:[email protected]>> wrote:
Thanks Brock.

I tried spooling directory, if the file dropped in spoolDir was too large, 
flume also died. There should be a blocking.
Will start a standalone script process to drop small files.

Best Regards,
Yongkun Wang

On 2013/08/19, at 22:08, Brock Noland wrote:



In your case I would look at the spooling directory source.

On Sun, Aug 18, 2013 at 9:29 PM, Wang, Yongkun | Yongkun | BDD 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

I am testing with apache-flume-1.4.0-bin.
I made a naive python script for exec source to do throttling by calling 
sleep() function.
But the sleep() doesn't work when called by exec source.
Any ideas about this or do you have some simply solution for throttling instead 
of a custom source?

Flume config:






agent.sources = src1

agent.sources.src1.type = exec

agent.sources.src1.command = read-file-throttle.py

read-file-throttle.py:






#!/usr/bin/python



import time



count=0

pre_time=time.time()

with open("apache.log") as infile:

    for line in infile:

        line = line.strip()

        print line

        count += 1

        if count % 50000 == 0:

            now_time = time.time()

            diff = now_time - pre_time

            if diff < 10:

                #print "sleeping %s seconds ..." % (diff)

                time.sleep(diff)

                pre_time = now_time


Thank you very much.

Best Regards,
Yongkun Wang



--
Apache MRUnit - Unit testing MapReduce - 
http://mrunit.apache.org<http://mrunit.apache.org/>




--
Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org

Reply via email to