Thanks, guys, this was a great thread. May be worth pointing to it in the online docs as it asks and answers a lot of interesting questions about the performance characteristics and tradeoffs made in Kafka.
How far out do you think built-in replication is? Best, O. On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <[email protected]> wrote: > Agreed, no reason the policy to hand out messages should not be > configurable. We were hoping to make the whole question irrelevant with the > replication since then the producer can choose the replication level it > wants and fsync durability should be less of a concern. > > I agree with your comment that a good implementation of streaming with acks > being potentially superior. > > -Jay > > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <[email protected] > >wrote: > > > Jay, > > > > Ah - thanks for the clarification on the delay in the broker. It would be > > nice to if that were a configuration option, so that the end user can > > choose > > only to forward messages that have been written to disk, or choose to > have > > the data forwarded immediately. When you implement replication data > hitting > > the disk will matter less. > > > > On the delay in the producer, I think it could best be resolved through > > measurement. In your paper you compare two different approaches, and I'm > > proposing a third: > > > > 1. Send and wait (single message, JMS style) > > 2. Batch, send, and wait (Kafka today) > > 3. Stream with ACKs > > > > Removing any wait for a reply should increase throughput, not decrease > it, > > so you're likely trading latency against potential CPU efficiency. And > the > > CPU savings is a question best resolved by measurement. > > > > I'd also encourage you to think about the WAN case. When you > send-and-wait, > > you need to send a buffer that is >> the bandwidth delay product to > > approach > > full line utilization, and the line will go idle for one RTT while you > stop > > to wait for a reply. The bandwidth*delay product can get large (10s of > > megabytes), and end users will rarely understand the need to tune the > batch > > size to increase throughput. They'll just say it's slow over long > > distances. > > > > All that said - your use case doesn't require minimizing latency or WAN > > use, > > so I can really understand if this isn't a priority for you. > > > > It's a well designed product that has had some real thought put into it. > > It's a really promising system, thanks for taking the time to respond to > my > > comments. > > > > Paul > > > > On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <[email protected]> wrote: > > > > > Ah, I think what you are describing in zeromq is essentially the > > equivalent > > > of group commit for the socket. Essentially you wait until the socket > is > > no > > > longer writable and then begin to queue data. This is an interesting > > idea. > > > Of course it would only have a positive effect when you had already > > > overflowed the socket buffer and were sending a very high throughput of > > > small messages. That basically is a way to degrade an overloaded > > > synchronous > > > send into a batched send. This is not really the same as what we have > > done, > > > which is to allow the ability to trade off latency for throughput in a > > > configurable manner. The reason the later is important is that we do > not > > > have a handful of producers sending at a rate that saturates the > network > > > I/O > > > capacity of those servers (the case where the group commit would help) > > but > > > rather we have thousands of producers sending at a medium low volume, > so > > we > > > would never hit that in our use case. The advantage of batching is > fewer > > > requests that hit the server, and larger packets. Where the group > commit > > > would help is for the synchronous producer benchmarks, where you could > > > potentially get much better throughput. This is something we should > > > consider > > > adding. > > > > > > To be clear, though, we have not added latency in our layer, just made > a > > > configurable way to trade-off latency for throughput. This is > > unambiguously > > > a good thing, I think. > > > > > > With respect to mmap, i think you are misunderstanding where the > latency > > > comes from. We immediately write data to the filesystem with no delay > > > whatsoever. This incurs the overhead of a system call, as you point > out, > > > which could be avoided by mmap, but that doesn't add much in the way of > > > latency. The latency comes from the fact that we do not make the > written > > > data available to consumers until we fsync the file to "ensure" the > > > durability of consumed messages. The frequency of the fsync is > > > configurable, > > > anything either immediate or with a time or # messages threshold. This > > > again > > > trades latency for throughput. > > > > > > -Jay > > > > > > On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <[email protected] > > > >wrote: > > > > > > > *Producer latency* - I'm not familiar with zeromq internals but my > > > > understanding is that they send the first message(s) immediately and > as > > > TCP > > > > queues up the data, it will eventually block as the send buffer > fills, > > > and > > > > during this time messages can queue up, and thte net-net is that on > > > average > > > > the number of system calls is << the number of messages. The key is > > > having > > > > a > > > > separate thread for network operations with very efficient thread > > > > coordination. Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a > > > blight > > > > against humanity. > > > > > > > > Having any sort of delay adds latency. If every developer thinks its > OK > > > to > > > > add a little latency in his layer, pretty soon you end up with 10 > > second > > > > end > > > > to end latency. > > > > > > > > Having an "accumulated message count" is also bad for WAN > performance. > > If > > > > your "window size" is a set of delayed messages, the only way to deal > > > with > > > > a > > > > large bandwidth*delay product is to delay a lot of messages, then > send > > > > them. > > > > You can fit a lot of data into a fiber. Imagine a gigabit link with > > 100ms > > > > roundtrip time, you can store 100MB in the fiber. And you need a > > > multiples > > > > of that for buffering if you need to do a retransmit. > > > > > > > > *Broker Latency *- With mmap the memcpy() of the message should make > > the > > > > data available to a thread even in another process, the pages that > you > > > have > > > > mapped are also in the buffer cache and available to a sendfile() > call. > > > or > > > > at least I think so. The flush to physical disk (or msync() in this > > case) > > > > would still be delayed but without impacting end to end latency. > > > > > > > > That said, in benchmarks I have done the fastest IO with the lowest > CPU > > > > overhead is unbuffered (direct) IO (which is lower overhead than > using > > > the > > > > buffer cache with or without memory mapping), but then you'd have to > > > manage > > > > your own buffer pool and run your broker in a single multithreaded > > > process. > > > > But thats getting more extreme. Just getting rid of this buffer write > > > delay > > > > by using memory mapping will remove a big chunk of latency. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <[email protected]> > > wrote: > > > > > > > > > Hi Paul, > > > > > > > > > > We are definitely interested in lowering latency--lower is always > > > > > better--but that was not a major concern for us so far (we were > > > replacing > > > > a > > > > > system with 1 hour latency), so we haven't focused on it yet. As > you > > > > > describe latency in our setup at linkedin comes from batching on > the > > > > > frontend and batching on the kafka servers do to very lenient flush > > > > > settings. > > > > > > > > > > I am interested in your comments on zeromq. Do they actually have a > > > > better > > > > > approach for this problem even when using TCP? If so I would be > > > > interested > > > > > to understand. The way I see things this is about trading > throughput > > > and > > > > > latency. On the producer side you have only a few options: > > immediately > > > > > write > > > > > the data to the socket buffer for sending or wait and see if the > > > > > application > > > > > writes more data. The OS will do this for you unless you set > > > TCP_NODELAY, > > > > > but the OS is relatively inflexible, it doesn't understand your > data > > so > > > I > > > > > think it just waits 200ms or until the socket buffer is full. > > > > > > > > > > The current approach in the async producer captures the same > > tradeoff, > > > > but > > > > > a > > > > > little more flexibly, it allows you to specify a max delay and max > > > > > accumulated message count, data is written when either of those is > > hit. > > > > > > > > > > Is it possible to better capture this tradeoff? Basically I am not > > > aware > > > > of > > > > > any other trick here if you are using TCP, so i would be interested > > in > > > > what > > > > > zeromq does if they are doing this better. > > > > > > > > > > We do indeed write each message set to the filesystem as it arrives > > but > > > > we > > > > > distribute messages to consumers only after the write has been > > flushed > > > to > > > > > disk, delaying (batching) that flush is the cause of the latency > but > > > also > > > > > gives better use of IOPs by generating larger writes. Mmap would > > remove > > > > the > > > > > system call (which would be good), but not the flush I think. As > you > > > say, > > > > > adding replication allows giving stronger guarantees without > actually > > > > > caring > > > > > about durability on a particular server which would make it > possible > > to > > > > > distribute messages to consumers after ack from some number of > other > > > > > servers > > > > > irrespective of flushing to disk. > > > > > > > > > > -Jay > > > > > > > > > > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter < > [email protected] > > > > > > > > wrote: > > > > > > > > > > > Jun > > > > > > > > > > > > Thanks for your answers and the link to the paper - that helps a > > lot, > > > > > > especially the comment in the paper that 10 second end to end > > latency > > > > is > > > > > > good enough for your intended use case. > > > > > > > > > > > > We're looking for much lower latencies, and the basic design of > > Kafka > > > > > feels > > > > > > like it should support latencies in milliseconds with a few > > changes. > > > > > We're > > > > > > either going to build our own system, or help develop something > > that > > > > > > already > > > > > > exists, so please take my comments in the constructive way > they're > > > > > intended > > > > > > (I realize the changes I'm suggesting are outside your intended > use > > > > case, > > > > > > but if you're interested we may be able to provide a very capable > > > > > developer > > > > > > to help with the work, assuming we choose kafka over the other > > > zillion > > > > > > streaming systems that are coming out of the woodwork). > > > > > > > > > > > > a. *Producer "queue.time"* - In my question 4 below, I was > > referring > > > to > > > > > the > > > > > > producer queue time. With a default value of 5 seconds, that > > > accounts > > > > > for > > > > > > half your end to end latency. A system like zeromq is optimized > to > > > > write > > > > > > data immediately without delay, but in such a way to minimizes > the > > > > number > > > > > > of > > > > > > system calls required during high throughput messages. Zeromq is > no > > > > > > nirvana, > > > > > > but it has a number of nice properties. > > > > > > > > > > > > b. *Broker "log.default.flush.interval.ms"* - The default value > of > > 3 > > > > > > seconds > > > > > > appears to be another significant source of latency in the > system, > > > > > assuming > > > > > > that clients are unable to access data until it has been flushed. > > > Since > > > > > you > > > > > > have wisely chosen to take advantage of the buffer cache as part > of > > > > your > > > > > > system design, it seems that you could remove this latency > > completely > > > > by > > > > > > memory mapping the partitions and memcpying each message as it > > > arrives. > > > > > > With > > > > > > the right IPC mechanism clients could have immediate access to > new > > > > > > messages. > > > > > > > > > > > > c. *Batching, sync vs async, replication, and auditing*. Its > > > > > understandable > > > > > > that you've chosen a a forensic approach to producer reliability > > > (after > > > > > the > > > > > > fact auditing), but when you implement replication it would be > > really > > > > > nice > > > > > > to revise the producer protocol mechanisms. If you used a > streaming > > > > > > mechanism with producer offsets and ACKs, you could ensure > reliable > > > > > > delivery > > > > > > of producer streams to multiple brokers without the need to > choose > > a > > > > > "batch > > > > > > size" or "queue.time". This could also give you active/active > > > failover > > > > of > > > > > > brokers. This may also help in the WAN case (my question 3 below) > > > > because > > > > > > you will be able to adaptively stuff more and more data through > the > > > > fiber > > > > > > for high bandwidth*delay links without having to choose a large > > > "batch > > > > > > size" > > > > > > nor have the additional latency that entails. Oh, and it will > help > > > you > > > > > deal > > > > > > with CRC errors once you start checking for them. > > > > > > > > > > > > c. *Performance measurements* - I'd like to make a suggestion for > > > your > > > > > > performance measurements. Your benchmarks measure throughput, but > a > > > > > > throughput number is meaningless without an associated "% cpu > > time". > > > > > > Ideally > > > > > > all measurements achieve wire speed (100MB/sec) at 0% CPU (since, > > > after > > > > > > all, > > > > > > this is plumbing and we assume the cores in the system should > have > > > > > capacity > > > > > > set aside for useful work too). Obviously nobody ever achieves > > this, > > > > but > > > > > by > > > > > > measuring it one can raise the bar in terms of optimization. > > > > > > > > > > > > Paul > > > > > > > > > > > > ps. Just for background, I am the cofounder at Quantcast where we > > > > process > > > > > > 3.5PB of data per day. These questions are related to my new > > startup > > > > > > Quantbench which will deal with financial market data where you > > dont > > > > want > > > > > > any latency at all. And WAN issues are a big deal too. > > Incidentally, > > > I > > > > > was > > > > > > also founder of Orbital Data which was a WAN optimization company > > so > > > > I've > > > > > > done a lot of work with protocols over long distances. > > > > > > > > > > > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <[email protected]> > wrote: > > > > > > > > > > > > > Paul, > > > > > > > > > > > > > > Excellent questions. See my answers below. Thanks, > > > > > > > > > > > > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter < > > > [email protected] > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Kafka looks like an exciting project, thanks for opening it > up. > > > > > > > > > > > > > > > > I have a few questions: > > > > > > > > > > > > > > > > 1. Are checksums end to end (ie, created by the producer and > > > > checked > > > > > by > > > > > > > the > > > > > > > > consumer)? or are they only used to confirm buffercache > > behavior > > > on > > > > > > disk > > > > > > > as > > > > > > > > mentioned in the documentation? Bit errors occur vastly more > > > often > > > > > than > > > > > > > > most > > > > > > > > people assume, often because of device driver bugs. TCP only > > > > detects > > > > > 1 > > > > > > > > error > > > > > > > > in 65536, so errors can flow through (if you like I can send > > > links > > > > to > > > > > > > > papers > > > > > > > > describing the need for checksums everywhere). > > > > > > > > > > > > > > > > > > > > > > Checksum is generated at the producer and propagated to the > > broker > > > > and > > > > > > > eventually the consumer. Currently, we only validate the > checksum > > > at > > > > > the > > > > > > > broker. We could further validate it at the consumer in the > > future. > > > > > > > > > > > > > > > > > > > > > > > 2. The consumer has a pretty solid mechanism to ensure it > hasnt > > > > > missed > > > > > > > any > > > > > > > > messages (i like the design by the way), but how does the > > > producer > > > > > know > > > > > > > > that > > > > > > > > all of its messages have been stored? (no apparent message id > > on > > > > that > > > > > > > side > > > > > > > > since the message id isnt known until the message is written > to > > > the > > > > > > > file). > > > > > > > > I'm especially curious how failover/replication could be > > > > implemented > > > > > > and > > > > > > > > I'm > > > > > > > > thinking that acks on the publisher side may help) > > > > > > > > > > > > > > > > > > > > > > The producer side auditing is not built-in. At LinkedIn, we do > > that > > > > by > > > > > > > generating an auditing event periodically in the eventhandler > of > > > the > > > > > > async > > > > > > > producer. The auditing event contains the number of events > > produced > > > > in > > > > > a > > > > > > > configured window (e.g., 10 minutes) and are sent to a separate > > > > topic. > > > > > > The > > > > > > > consumer can read the actual data and the auditing event and > > > compare > > > > > the > > > > > > > counts. See our paper ( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf > > > > > > > ) > > > > > > > for some more details. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Has the consumer's flow control been tested over high > > > > > > bandwidth*delay > > > > > > > > links? (what bandwidth can you get from a London consumer of > an > > > SF > > > > > > > > cluster?) > > > > > > > > > > > > > > > > Yes, we actually replicate kafka data across data centers, > > using > > > an > > > > > > > embedded consumer in a broker. Again, there is a bit more info > on > > > > this > > > > > in > > > > > > > our paper. > > > > > > > > > > > > > > > > > > > > > > 4. What kind of performance do you get if you set the > > producer's > > > > > > message > > > > > > > > delay to zero? (ie, is there a separate system call for each > > > > message? > > > > > > or > > > > > > > do > > > > > > > > you manage to aggregate messages into a smaller number of > > system > > > > > calls > > > > > > > even > > > > > > > > with a delay of 0?) > > > > > > > > > > > > > > > > I assume that you are referring to the flush interval. One > can > > > > > > configure > > > > > > > to > > > > > > > flush every message to disk. This will slow down the throughput > > > > > > > significantly. > > > > > > > > > > > > > > > > > > > > > > 5. Have you considered using a library like zeromq for the > > > > messaging > > > > > > > layer > > > > > > > > instead of rolling your own? (zeromq will handle #4 cleanly > at > > > > > millions > > > > > > > of > > > > > > > > messages per second and has clients in 20 languages) > > > > > > > > > > > > > > > > No. Our proprietary format allows us to support things like > > > > > compression > > > > > > > in > > > > > > > the future. However, we can definitely look into the zeromq > > format. > > > > Is > > > > > > > their > > > > > > > messaging layer easily extractable? > > > > > > > > > > > > > > > > > > > > > > 6. Do you have any plans to support intermediate processing > > > > elements > > > > > > the > > > > > > > > way > > > > > > > > Flume supports? > > > > > > > > > > > > > > > > For now, we are just focusing on getting the raw messaging > > layer > > > > > solid. > > > > > > > We > > > > > > > have worked a bit on streaming processing and will look into > that > > > > again > > > > > > in > > > > > > > the future. > > > > > > > > > > > > > > > > > > > > > > 7. The docs mention that new versions will only be released > > after > > > > > they > > > > > > > are > > > > > > > > in production at LinkedIn? Does that mean that the latest > > version > > > > of > > > > > > the > > > > > > > > source code is hidden at LinkedIn and contributors would have > > to > > > > > throw > > > > > > > > patches over the wall and wait months to get the integrated > > > > product? > > > > > > > > > > > > > > > > What we ran at LinkedIn is the same version in open source > and > > > > there > > > > > is > > > > > > > no > > > > > > > internal repository of Kafka at LinkedIn. We plan to maintain > > that > > > in > > > > > the > > > > > > > future. > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
