Hi Steve,
Thanks for the rundown, lots of detail is better than not enough!
Response inline
On 09/06/2012 01:37 AM, Steve Johnson wrote:
Hi all, I wanted to share some of my test findings/concerns, etc..
First off, I apologize for this being so verbose, but I feel I need to
give a little bit of a background into our setup and needs to show the
big picture. Please ignore this if your not interested, you've been
warned...
But if you are, great, cause I do have some valid questions to follow
and really looking forward to any constructive comments.
Prior to a few weeks back, I have zero experience with Flume. Have
been familiar with it's existence for some time (about a year) but
nothing more than that.
My company, generates about 8billion log records per day, spread
across 5 dataceters, with about 200 servers in each location. So
about 1.6 billion per day in each cage. We're growing and shotting to
increase that to about 30billion per day based on holiday traffic
growth and our companies growth. These log records are currently
hourly rotated logback(slf4j) generated logs from our java
applications, containing tab delimited ascii data of various widths.
There's probably 25 different log types we collect, but generally all
the same format, some average record lengths of 50-60 bytes, while
some others average 1k in width.
Right now, we collect them using a custom built java scheduling
application. We have a machine dedicated to this at each DC. This
box fires off some hourly jobs (within minutes after log rotations)
that pulls all the logs from the 200+ servers (some servers generate
up to 10 different log types per hour), uncompressed. We used to pull
directly to our central location, and would initiate compression on
the servers themselves, but this generated CPU/IO spikes every hour
that were causing performance issues. So we put a remote machine in
each node to handle local collection. They pull all the logs files
locally first, then compress, then move into a queue. This happens
across all 5 dc's in parallel. We have another set of schedulers in
our central location that then each collect from those remote nodes.
Pull them locally, then we do some ETL work and load the raw log data
into our Greenplum warehouse for nightly aggregations and analysis.
This is obviously becoming very cumbersome to maintain, as we have
right now, 10 different schedulers running over 6 locations. Also, to
guarantee we've fetched every log file, and also to guarantee we
haven' double-loaded any raw data (this data has only a logrec that's
maintained globally to guarantee uniqueness, so removing dupes is a
nightmare, so we like to avoid that), we have to track every file
pickup, for each hop (currently tracked in a postgresql db) and then
use that for validation and to also make sure we don't pull a rotated
log again (logs stay archived on their original servers for 7 days).
A couple years back when we had 1 or even 2 dc's with only about 30
servers in each, this wasn't so bad. But as you can imaging, we're
looking at over 80k files generated per day to track and manage. When
things run smooth, it's great, when we have issues, it's a pain to
dig into.
So what are the requirements I'm looking at for a replacement of said
system?
1. Less, or no custom configuration, must be drop-in and go
environment, right now, as we add/remove servers, I have to edit a lot
of db records to tell the schedulers which servers have which types of
logs, I also need to replicate it out, reload the configs and make
sure log sources have ssh keys installed, etc.
2. Must be able to compress data going between flume agents in remote
DC's to the flume agents in our central location. (bandwith for this
kind of data is not cheap, right now by gzipping the hourly logs
locally before we transfer, we get between about 6:1 to a 10:1
compression ratio depending on the log type.
3. Must be able to handle the throughput.
4. Must be transcriptional and recoverable, many of these logs
correlate directly to revenue, we must not lose data.
To have zero data loss you must use a reliable ingest system and a
lossless channel. Netcat source can't guarrantee delivery(if a channel
can't fit the sent messages for example, they will just get dropped).
Memory channel will lose data on a crash.
5. Scalable.
>From reading the docs I believe Flume is a possible solution.
Forward to today...
Flume Agent Config:
Version flume-ng 1.2.0:
JAVA_OPTS="-Xms1g -Xmx5g -Dcom.sun.management.jmxremote
-XX:MaxDirectMemorySize=5g"
This is running on a 16 core Intel Xeon 2.4ghz with 48Gb ram, and
local drives running raid5/xfs(not sure of the rpm's, but they're
pretty fast).
Testing/Flume Setup:
testagent.sources = netcatSource
testagent.channels = memChannel
testagent.sinks = fileSink
testagent.sources.netcatSource.type = netcat
testagent.sources.netcatSource.channels = memChannel
testagent.sinks.fileSink.type = FILE_ROLL
testagent.sinks.fileSink.channel = memChannel
testagent.channels.memChannel.type = memory
testagent.sources.netcatSource.bind = 0.0.0.0
testagent.sources.netcatSource.port = 6172
testagent.sources.netcatSource.max-line-length = 65536
testagent.channels.memChannel.capacity = 4294967296
This is huge. The memorychannel uses a blocking queue of events, and I'm
pretty sure that it will misbehave beyond the limits of the integer
range. Seeing as it's signed, that would be around 2 billion(and with an
average event length of say 50, that would consume at least 100gb of
ram)? FileChannel may or may not deal with huge capacities better. The
capacity designation is for event count, not bytes of data. Someone did
however recently post an issue about making physical size a setting in
some form, maybe you want to add your feedback to that (
https://issues.apache.org/jira/browse/FLUME-1535 )
testagent.sinks.fileSink.sink.directory =
/opt/dotomi/flume-data/sink/file_roll
testagent.sinks.fileSink.sink.rollInterval = 0
I took one of our production servers hourly logs, one of the largest
we produce, (this one has about 1.2million rows in it for that hour,
average record length about 700 bytes, some creeping up to 4k. Keep
in mind, this is one server in one cage out of 50 total).
I wrote a Perl script that opens a socket to the NetCat source port on
the agent, and buffers about 10 log recs and then sends them in
batches of 10. I originally tried line-by-line, this
was obviously super inefficient. I also attempted more (more on that
below) to buffer but started dropping too many events, i think it was
causing buffering issues on the agent. 10 seemed to be the magic
number for my setup. I also started with a FileChannel (recoverable),
and a simple file_roll sink so i could verify the output files.
As mentioned above, NetcatSource is not a reliable ingest system as it
doesn't know about events that weren't committed. In the long run, for
lossless, you will want to deliver data via either avro or the
scribe(thrift) data format. However if you just want to test specs, try
using ExecSource to tail the log files and fiddle about with the
batching settings.
I ended up having some troubles getting the FileChannel started. I
ended up getting it to start with some pretty narrow parameters which
caused my flow to be very slow. When I tried to set higher numbers in
capacity, it would either not start up, or start but nothing would
flow. I ended up moving to a memory channel just to get my proof of
concept moving, and to get a test of the framework first. Also, since
we're a java shop, we're not opposed to the idea of writing custom
sources/sinks/channels where need be, assuming the framework is sound.
In its current implementation FileChannel is lossless and thus causes a
disk flush(which generally writes two separate files) for every
commit(one for every batch of events). This is going to mean very slow
throughput if you have small batches. You can however improve this a lot
by having the channels data directories and checkpoint directories on
separate disks(not always feasible). Or you can just make sure you're
batching more events at a time.
After some heavy tuning, I was able to get something that worked well,
and performed very well. I was eventually able to get 200k per second
of these log events through.
To cut to the chase, Here's some issues I had;
1. Data loss (this was brought up in another thread). About every
other time (a little less, 40%) I would run the exact same test, it
would drop a very small number of events, 10 or less (out of about
500k events). Other times it would pass every event through without
issue.
I'm going to guess this is the (netcat)source not being able to send
messages to the channel. Since it doesn't inform the ingest system about
the failed delivery, the ingest also can't resend. I assume you're not
getting any exceptions like the other person recently asking about
netcat source?
2. Looking at my tunings, I was able to get about 60k per second on a
single flume instance with the above mentioned tuning. I decided to
crank everything up (double it, even tried then doubling that once
more). This machine has 48gb and is doing nothing but this. So
logically, I figured I could bump my OPTS to 10g instead of 5, and up
my channel capacity to 8g. Allowing me to buffer more and in theory
double my throughput. This wasn't at all the case, by attempting to
throw more at it (either by lowering my sleep times between batches,
or even using the same sleep times but double my batch size from
10lines to 20, things started flaking out). Basically, after about
200k lines went through, it just stopped processing, no warnings,
errors nothing.
Here's where it gets interesting though. I then setup four flume
agents on the same machine with all the same configurations and
startup params back at the 4g range, all listening on different ports.
I started all 4, and then in parallel (on another machine), ran my
test script to hit all four agents. That's when I was able to get
200k through. So by running four of them with lower tunables, I was
able to get the throughput I couldn't get running one with 4x the
tunables and startup options.
The channel capacity really shouldn't matter so long as it is large
enough to hold stuff in the interim until the sink drains it.
If you want to use one big agent, you may need to tune the sink
runners(which are single threaded). E.g. if you have a lot of data
coming in and one avro sink that just can't keep up, you can set up
multiple avro sinks. The channel size setting should be made to be large
enough to hold whatever amount of data you expect to build up if a
downstream server/write location becomes unavailable.
Number 2 is something i can easily live with but would like to hear
some insight on maybe what's causing it. Obviously the disks can keep
up because all of the file_roll paths for all 4 agents are using the
same drive. And obviously I have the ram to buffer accordingly. But
for some reason, one agent with 2x or even 4x the juice starts getting
flaky.
Number 1 is more concerning, this obviously will need to be solved.
The 2 rules I stick to for a lossless flow(your current system is
unfortunately breaking both):
1) ingest system using an rpc delivery that is aware of failed sends and
responds by resending data(we have a python program that tails files
sending to ScribeSource, keeping a position pointer, and rewinds that
pointer when the thrift rpc responds with failure).
2) Lossless channel(currently file or jdbc). This is generally only an
issue for restarts/failures.
In summary, I'm willing and ready to spend more time on this. But
wanted to get some insight from the pros, developers here and also
make sure I'm not crazy and maybe just trying to use this for more
than it was designed.
Many thanks for anyone that stuck around to read this! :)
Thanks for your feedback!
Best of luck,
Juhani
Cheers
--
Steve Johnson
[email protected] <mailto:[email protected]>