Alan, I saw other apache projects just post design docs as attachments in the jira and then just comment and iterate over there. Is that not the preferred way?
Also, I added a couple of kafka paper/presentations in kafka wiki. https://cwiki.apache.org/confluence/display/KAFKA/Index Jun On Wed, Jul 20, 2011 at 9:15 AM, Alan D. Cabrera <[email protected]>wrote: > It would be good to move the content of the PDF files to the wiki so that > the community can participate in the design. These PDF files need to be > removed. > > > Regards, > Alan > > On Jul 20, 2011, at 9:07 AM, Jun Rao wrote: > > > Oliver, > > > > We have a design for replication (see the design doc and subtasks at > > https://issues.apache.org/jira/browse/KAFKA-50). We are currently > wrapping > > up the compression support and will start working on replication soon. > > > > Jun > > > > On Tue, Jul 19, 2011 at 12:59 PM, Olivier Pomel <[email protected]> > wrote: > > > >> Thanks, guys, this was a great thread. May be worth pointing to it in > the > >> online docs as it asks and answers a lot of interesting questions about > the > >> performance characteristics and tradeoffs made in Kafka. > >> > >> How far out do you think built-in replication is? > >> Best, > >> O. > >> > >> > >> > >> On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <[email protected]> wrote: > >> > >>> Agreed, no reason the policy to hand out messages should not be > >>> configurable. We were hoping to make the whole question irrelevant with > >> the > >>> replication since then the producer can choose the replication level it > >>> wants and fsync durability should be less of a concern. > >>> > >>> I agree with your comment that a good implementation of streaming with > >> acks > >>> being potentially superior. > >>> > >>> -Jay > >>> > >>> On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <[email protected] > >>>> wrote: > >>> > >>>> Jay, > >>>> > >>>> Ah - thanks for the clarification on the delay in the broker. It would > >> be > >>>> nice to if that were a configuration option, so that the end user can > >>>> choose > >>>> only to forward messages that have been written to disk, or choose to > >>> have > >>>> the data forwarded immediately. When you implement replication data > >>> hitting > >>>> the disk will matter less. > >>>> > >>>> On the delay in the producer, I think it could best be resolved > through > >>>> measurement. In your paper you compare two different approaches, and > >> I'm > >>>> proposing a third: > >>>> > >>>> 1. Send and wait (single message, JMS style) > >>>> 2. Batch, send, and wait (Kafka today) > >>>> 3. Stream with ACKs > >>>> > >>>> Removing any wait for a reply should increase throughput, not decrease > >>> it, > >>>> so you're likely trading latency against potential CPU efficiency. And > >>> the > >>>> CPU savings is a question best resolved by measurement. > >>>> > >>>> I'd also encourage you to think about the WAN case. When you > >>> send-and-wait, > >>>> you need to send a buffer that is >> the bandwidth delay product to > >>>> approach > >>>> full line utilization, and the line will go idle for one RTT while you > >>> stop > >>>> to wait for a reply. The bandwidth*delay product can get large (10s of > >>>> megabytes), and end users will rarely understand the need to tune the > >>> batch > >>>> size to increase throughput. They'll just say it's slow over long > >>>> distances. > >>>> > >>>> All that said - your use case doesn't require minimizing latency or > WAN > >>>> use, > >>>> so I can really understand if this isn't a priority for you. > >>>> > >>>> It's a well designed product that has had some real thought put into > >> it. > >>>> It's a really promising system, thanks for taking the time to respond > >> to > >>> my > >>>> comments. > >>>> > >>>> Paul > >>>> > >>>> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <[email protected]> > >> wrote: > >>>> > >>>>> Ah, I think what you are describing in zeromq is essentially the > >>>> equivalent > >>>>> of group commit for the socket. Essentially you wait until the socket > >>> is > >>>> no > >>>>> longer writable and then begin to queue data. This is an interesting > >>>> idea. > >>>>> Of course it would only have a positive effect when you had already > >>>>> overflowed the socket buffer and were sending a very high throughput > >> of > >>>>> small messages. That basically is a way to degrade an overloaded > >>>>> synchronous > >>>>> send into a batched send. This is not really the same as what we have > >>>> done, > >>>>> which is to allow the ability to trade off latency for throughput in > >> a > >>>>> configurable manner. The reason the later is important is that we do > >>> not > >>>>> have a handful of producers sending at a rate that saturates the > >>> network > >>>>> I/O > >>>>> capacity of those servers (the case where the group commit would > >> help) > >>>> but > >>>>> rather we have thousands of producers sending at a medium low volume, > >>> so > >>>> we > >>>>> would never hit that in our use case. The advantage of batching is > >>> fewer > >>>>> requests that hit the server, and larger packets. Where the group > >>> commit > >>>>> would help is for the synchronous producer benchmarks, where you > >> could > >>>>> potentially get much better throughput. This is something we should > >>>>> consider > >>>>> adding. > >>>>> > >>>>> To be clear, though, we have not added latency in our layer, just > >> made > >>> a > >>>>> configurable way to trade-off latency for throughput. This is > >>>> unambiguously > >>>>> a good thing, I think. > >>>>> > >>>>> With respect to mmap, i think you are misunderstanding where the > >>> latency > >>>>> comes from. We immediately write data to the filesystem with no delay > >>>>> whatsoever. This incurs the overhead of a system call, as you point > >>> out, > >>>>> which could be avoided by mmap, but that doesn't add much in the way > >> of > >>>>> latency. The latency comes from the fact that we do not make the > >>> written > >>>>> data available to consumers until we fsync the file to "ensure" the > >>>>> durability of consumed messages. The frequency of the fsync is > >>>>> configurable, > >>>>> anything either immediate or with a time or # messages threshold. > >> This > >>>>> again > >>>>> trades latency for throughput. > >>>>> > >>>>> -Jay > >>>>> > >>>>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter < > >> [email protected] > >>>>>> wrote: > >>>>> > >>>>>> *Producer latency* - I'm not familiar with zeromq internals but my > >>>>>> understanding is that they send the first message(s) immediately > >> and > >>> as > >>>>> TCP > >>>>>> queues up the data, it will eventually block as the send buffer > >>> fills, > >>>>> and > >>>>>> during this time messages can queue up, and thte net-net is that on > >>>>> average > >>>>>> the number of system calls is << the number of messages. The key is > >>>>> having > >>>>>> a > >>>>>> separate thread for network operations with very efficient thread > >>>>>> coordination. Clearly Nagle is disabled (TCP_NODELAY) as Nagle is > >> a > >>>>> blight > >>>>>> against humanity. > >>>>>> > >>>>>> Having any sort of delay adds latency. If every developer thinks > >> its > >>> OK > >>>>> to > >>>>>> add a little latency in his layer, pretty soon you end up with 10 > >>>> second > >>>>>> end > >>>>>> to end latency. > >>>>>> > >>>>>> Having an "accumulated message count" is also bad for WAN > >>> performance. > >>>> If > >>>>>> your "window size" is a set of delayed messages, the only way to > >> deal > >>>>> with > >>>>>> a > >>>>>> large bandwidth*delay product is to delay a lot of messages, then > >>> send > >>>>>> them. > >>>>>> You can fit a lot of data into a fiber. Imagine a gigabit link with > >>>> 100ms > >>>>>> roundtrip time, you can store 100MB in the fiber. And you need a > >>>>> multiples > >>>>>> of that for buffering if you need to do a retransmit. > >>>>>> > >>>>>> *Broker Latency *- With mmap the memcpy() of the message should > >> make > >>>> the > >>>>>> data available to a thread even in another process, the pages that > >>> you > >>>>> have > >>>>>> mapped are also in the buffer cache and available to a sendfile() > >>> call. > >>>>> or > >>>>>> at least I think so. The flush to physical disk (or msync() in this > >>>> case) > >>>>>> would still be delayed but without impacting end to end latency. > >>>>>> > >>>>>> That said, in benchmarks I have done the fastest IO with the lowest > >>> CPU > >>>>>> overhead is unbuffered (direct) IO (which is lower overhead than > >>> using > >>>>> the > >>>>>> buffer cache with or without memory mapping), but then you'd have > >> to > >>>>> manage > >>>>>> your own buffer pool and run your broker in a single multithreaded > >>>>> process. > >>>>>> But thats getting more extreme. Just getting rid of this buffer > >> write > >>>>> delay > >>>>>> by using memory mapping will remove a big chunk of latency. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <[email protected]> > >>>> wrote: > >>>>>> > >>>>>>> Hi Paul, > >>>>>>> > >>>>>>> We are definitely interested in lowering latency--lower is always > >>>>>>> better--but that was not a major concern for us so far (we were > >>>>> replacing > >>>>>> a > >>>>>>> system with 1 hour latency), so we haven't focused on it yet. As > >>> you > >>>>>>> describe latency in our setup at linkedin comes from batching on > >>> the > >>>>>>> frontend and batching on the kafka servers do to very lenient > >> flush > >>>>>>> settings. > >>>>>>> > >>>>>>> I am interested in your comments on zeromq. Do they actually have > >> a > >>>>>> better > >>>>>>> approach for this problem even when using TCP? If so I would be > >>>>>> interested > >>>>>>> to understand. The way I see things this is about trading > >>> throughput > >>>>> and > >>>>>>> latency. On the producer side you have only a few options: > >>>> immediately > >>>>>>> write > >>>>>>> the data to the socket buffer for sending or wait and see if the > >>>>>>> application > >>>>>>> writes more data. The OS will do this for you unless you set > >>>>> TCP_NODELAY, > >>>>>>> but the OS is relatively inflexible, it doesn't understand your > >>> data > >>>> so > >>>>> I > >>>>>>> think it just waits 200ms or until the socket buffer is full. > >>>>>>> > >>>>>>> The current approach in the async producer captures the same > >>>> tradeoff, > >>>>>> but > >>>>>>> a > >>>>>>> little more flexibly, it allows you to specify a max delay and > >> max > >>>>>>> accumulated message count, data is written when either of those > >> is > >>>> hit. > >>>>>>> > >>>>>>> Is it possible to better capture this tradeoff? Basically I am > >> not > >>>>> aware > >>>>>> of > >>>>>>> any other trick here if you are using TCP, so i would be > >> interested > >>>> in > >>>>>> what > >>>>>>> zeromq does if they are doing this better. > >>>>>>> > >>>>>>> We do indeed write each message set to the filesystem as it > >> arrives > >>>> but > >>>>>> we > >>>>>>> distribute messages to consumers only after the write has been > >>>> flushed > >>>>> to > >>>>>>> disk, delaying (batching) that flush is the cause of the latency > >>> but > >>>>> also > >>>>>>> gives better use of IOPs by generating larger writes. Mmap would > >>>> remove > >>>>>> the > >>>>>>> system call (which would be good), but not the flush I think. As > >>> you > >>>>> say, > >>>>>>> adding replication allows giving stronger guarantees without > >>> actually > >>>>>>> caring > >>>>>>> about durability on a particular server which would make it > >>> possible > >>>> to > >>>>>>> distribute messages to consumers after ack from some number of > >>> other > >>>>>>> servers > >>>>>>> irrespective of flushing to disk. > >>>>>>> > >>>>>>> -Jay > >>>>>>> > >>>>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter < > >>> [email protected] > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Jun > >>>>>>>> > >>>>>>>> Thanks for your answers and the link to the paper - that helps > >> a > >>>> lot, > >>>>>>>> especially the comment in the paper that 10 second end to end > >>>> latency > >>>>>> is > >>>>>>>> good enough for your intended use case. > >>>>>>>> > >>>>>>>> We're looking for much lower latencies, and the basic design of > >>>> Kafka > >>>>>>> feels > >>>>>>>> like it should support latencies in milliseconds with a few > >>>> changes. > >>>>>>> We're > >>>>>>>> either going to build our own system, or help develop something > >>>> that > >>>>>>>> already > >>>>>>>> exists, so please take my comments in the constructive way > >>> they're > >>>>>>> intended > >>>>>>>> (I realize the changes I'm suggesting are outside your intended > >>> use > >>>>>> case, > >>>>>>>> but if you're interested we may be able to provide a very > >> capable > >>>>>>> developer > >>>>>>>> to help with the work, assuming we choose kafka over the other > >>>>> zillion > >>>>>>>> streaming systems that are coming out of the woodwork). > >>>>>>>> > >>>>>>>> a. *Producer "queue.time"* - In my question 4 below, I was > >>>> referring > >>>>> to > >>>>>>> the > >>>>>>>> producer queue time. With a default value of 5 seconds, that > >>>>> accounts > >>>>>>> for > >>>>>>>> half your end to end latency. A system like zeromq is optimized > >>> to > >>>>>> write > >>>>>>>> data immediately without delay, but in such a way to minimizes > >>> the > >>>>>> number > >>>>>>>> of > >>>>>>>> system calls required during high throughput messages. Zeromq > >> is > >>> no > >>>>>>>> nirvana, > >>>>>>>> but it has a number of nice properties. > >>>>>>>> > >>>>>>>> b. *Broker "log.default.flush.interval.ms"* - The default > >> value > >>> of > >>>> 3 > >>>>>>>> seconds > >>>>>>>> appears to be another significant source of latency in the > >>> system, > >>>>>>> assuming > >>>>>>>> that clients are unable to access data until it has been > >> flushed. > >>>>> Since > >>>>>>> you > >>>>>>>> have wisely chosen to take advantage of the buffer cache as > >> part > >>> of > >>>>>> your > >>>>>>>> system design, it seems that you could remove this latency > >>>> completely > >>>>>> by > >>>>>>>> memory mapping the partitions and memcpying each message as it > >>>>> arrives. > >>>>>>>> With > >>>>>>>> the right IPC mechanism clients could have immediate access to > >>> new > >>>>>>>> messages. > >>>>>>>> > >>>>>>>> c. *Batching, sync vs async, replication, and auditing*. Its > >>>>>>> understandable > >>>>>>>> that you've chosen a a forensic approach to producer > >> reliability > >>>>> (after > >>>>>>> the > >>>>>>>> fact auditing), but when you implement replication it would be > >>>> really > >>>>>>> nice > >>>>>>>> to revise the producer protocol mechanisms. If you used a > >>> streaming > >>>>>>>> mechanism with producer offsets and ACKs, you could ensure > >>> reliable > >>>>>>>> delivery > >>>>>>>> of producer streams to multiple brokers without the need to > >>> choose > >>>> a > >>>>>>> "batch > >>>>>>>> size" or "queue.time". This could also give you active/active > >>>>> failover > >>>>>> of > >>>>>>>> brokers. This may also help in the WAN case (my question 3 > >> below) > >>>>>> because > >>>>>>>> you will be able to adaptively stuff more and more data through > >>> the > >>>>>> fiber > >>>>>>>> for high bandwidth*delay links without having to choose a large > >>>>> "batch > >>>>>>>> size" > >>>>>>>> nor have the additional latency that entails. Oh, and it will > >>> help > >>>>> you > >>>>>>> deal > >>>>>>>> with CRC errors once you start checking for them. > >>>>>>>> > >>>>>>>> c. *Performance measurements* - I'd like to make a suggestion > >> for > >>>>> your > >>>>>>>> performance measurements. Your benchmarks measure throughput, > >> but > >>> a > >>>>>>>> throughput number is meaningless without an associated "% cpu > >>>> time". > >>>>>>>> Ideally > >>>>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU > >> (since, > >>>>> after > >>>>>>>> all, > >>>>>>>> this is plumbing and we assume the cores in the system should > >>> have > >>>>>>> capacity > >>>>>>>> set aside for useful work too). Obviously nobody ever achieves > >>>> this, > >>>>>> but > >>>>>>> by > >>>>>>>> measuring it one can raise the bar in terms of optimization. > >>>>>>>> > >>>>>>>> Paul > >>>>>>>> > >>>>>>>> ps. Just for background, I am the cofounder at Quantcast where > >> we > >>>>>> process > >>>>>>>> 3.5PB of data per day. These questions are related to my new > >>>> startup > >>>>>>>> Quantbench which will deal with financial market data where you > >>>> dont > >>>>>> want > >>>>>>>> any latency at all. And WAN issues are a big deal too. > >>>> Incidentally, > >>>>> I > >>>>>>> was > >>>>>>>> also founder of Orbital Data which was a WAN optimization > >> company > >>>> so > >>>>>> I've > >>>>>>>> done a lot of work with protocols over long distances. > >>>>>>>> > >>>>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <[email protected]> > >>> wrote: > >>>>>>>> > >>>>>>>>> Paul, > >>>>>>>>> > >>>>>>>>> Excellent questions. See my answers below. Thanks, > >>>>>>>>> > >>>>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter < > >>>>> [email protected] > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Kafka looks like an exciting project, thanks for opening it > >>> up. > >>>>>>>>>> > >>>>>>>>>> I have a few questions: > >>>>>>>>>> > >>>>>>>>>> 1. Are checksums end to end (ie, created by the producer > >> and > >>>>>> checked > >>>>>>> by > >>>>>>>>> the > >>>>>>>>>> consumer)? or are they only used to confirm buffercache > >>>> behavior > >>>>> on > >>>>>>>> disk > >>>>>>>>> as > >>>>>>>>>> mentioned in the documentation? Bit errors occur vastly > >> more > >>>>> often > >>>>>>> than > >>>>>>>>>> most > >>>>>>>>>> people assume, often because of device driver bugs. TCP > >> only > >>>>>> detects > >>>>>>> 1 > >>>>>>>>>> error > >>>>>>>>>> in 65536, so errors can flow through (if you like I can > >> send > >>>>> links > >>>>>> to > >>>>>>>>>> papers > >>>>>>>>>> describing the need for checksums everywhere). > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> Checksum is generated at the producer and propagated to the > >>>> broker > >>>>>> and > >>>>>>>>> eventually the consumer. Currently, we only validate the > >>> checksum > >>>>> at > >>>>>>> the > >>>>>>>>> broker. We could further validate it at the consumer in the > >>>> future. > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it > >>> hasnt > >>>>>>> missed > >>>>>>>>> any > >>>>>>>>>> messages (i like the design by the way), but how does the > >>>>> producer > >>>>>>> know > >>>>>>>>>> that > >>>>>>>>>> all of its messages have been stored? (no apparent message > >> id > >>>> on > >>>>>> that > >>>>>>>>> side > >>>>>>>>>> since the message id isnt known until the message is > >> written > >>> to > >>>>> the > >>>>>>>>> file). > >>>>>>>>>> I'm especially curious how failover/replication could be > >>>>>> implemented > >>>>>>>> and > >>>>>>>>>> I'm > >>>>>>>>>> thinking that acks on the publisher side may help) > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> The producer side auditing is not built-in. At LinkedIn, we > >> do > >>>> that > >>>>>> by > >>>>>>>>> generating an auditing event periodically in the eventhandler > >>> of > >>>>> the > >>>>>>>> async > >>>>>>>>> producer. The auditing event contains the number of events > >>>> produced > >>>>>> in > >>>>>>> a > >>>>>>>>> configured window (e.g., 10 minutes) and are sent to a > >> separate > >>>>>> topic. > >>>>>>>> The > >>>>>>>>> consumer can read the actual data and the auditing event and > >>>>> compare > >>>>>>> the > >>>>>>>>> counts. See our paper ( > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf > >>>>>>>>> ) > >>>>>>>>> for some more details. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 3. Has the consumer's flow control been tested over high > >>>>>>>> bandwidth*delay > >>>>>>>>>> links? (what bandwidth can you get from a London consumer > >> of > >>> an > >>>>> SF > >>>>>>>>>> cluster?) > >>>>>>>>>> > >>>>>>>>>> Yes, we actually replicate kafka data across data centers, > >>>> using > >>>>> an > >>>>>>>>> embedded consumer in a broker. Again, there is a bit more > >> info > >>> on > >>>>>> this > >>>>>>> in > >>>>>>>>> our paper. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> 4. What kind of performance do you get if you set the > >>>> producer's > >>>>>>>> message > >>>>>>>>>> delay to zero? (ie, is there a separate system call for > >> each > >>>>>> message? > >>>>>>>> or > >>>>>>>>> do > >>>>>>>>>> you manage to aggregate messages into a smaller number of > >>>> system > >>>>>>> calls > >>>>>>>>> even > >>>>>>>>>> with a delay of 0?) > >>>>>>>>>> > >>>>>>>>>> I assume that you are referring to the flush interval. One > >>> can > >>>>>>>> configure > >>>>>>>>> to > >>>>>>>>> flush every message to disk. This will slow down the > >> throughput > >>>>>>>>> significantly. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> 5. Have you considered using a library like zeromq for the > >>>>>> messaging > >>>>>>>>> layer > >>>>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly > >>> at > >>>>>>> millions > >>>>>>>>> of > >>>>>>>>>> messages per second and has clients in 20 languages) > >>>>>>>>>> > >>>>>>>>>> No. Our proprietary format allows us to support things like > >>>>>>> compression > >>>>>>>>> in > >>>>>>>>> the future. However, we can definitely look into the zeromq > >>>> format. > >>>>>> Is > >>>>>>>>> their > >>>>>>>>> messaging layer easily extractable? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> 6. Do you have any plans to support intermediate processing > >>>>>> elements > >>>>>>>> the > >>>>>>>>>> way > >>>>>>>>>> Flume supports? > >>>>>>>>>> > >>>>>>>>>> For now, we are just focusing on getting the raw messaging > >>>> layer > >>>>>>> solid. > >>>>>>>>> We > >>>>>>>>> have worked a bit on streaming processing and will look into > >>> that > >>>>>> again > >>>>>>>> in > >>>>>>>>> the future. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> 7. The docs mention that new versions will only be released > >>>> after > >>>>>>> they > >>>>>>>>> are > >>>>>>>>>> in production at LinkedIn? Does that mean that the latest > >>>> version > >>>>>> of > >>>>>>>> the > >>>>>>>>>> source code is hidden at LinkedIn and contributors would > >> have > >>>> to > >>>>>>> throw > >>>>>>>>>> patches over the wall and wait months to get the integrated > >>>>>> product? > >>>>>>>>>> > >>>>>>>>>> What we ran at LinkedIn is the same version in open source > >>> and > >>>>>> there > >>>>>>> is > >>>>>>>>> no > >>>>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain > >>>> that > >>>>> in > >>>>>>> the > >>>>>>>>> future. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> Thanks! > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >
