On Mon, Sep 19, 2011 at 9:38 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote:
> Hi Taylor, > > I'm trying to understand why you need this proxy approach to send data to > the brokers. > > >> Therefore, to implement a proxy I would have no choice but to buffer > responses from different sharded instances and re-order them into the > client > issued order, or else the client would be confused. > > Today, the brokers do not send any ACK for produce requests. So, there are > no responses to worry about, right ? > > I'm just talking about consumers in this case. For producers, you are right, no changes are required. > >> Our use case requires greater than 20k topics system-wide, thus I am > looking to shard multiple instances of Kafka sharding by topic name and > using a consistent hashing scheme to pick the correct physical instance. > > To scale the number of topics, there are a couple of things you could do. > Firstly, all topics do not need to be stored on all the brokers. Today, the > default behavior of Kafka is to scatter the data for a topic on ALL brokers > in the cluster. That surely limits the number of topics you can host in one > cluster. But in 0.6, we released the dynamic partitioning feature on the > producer. Using some custom partitioner, you could easily host topics on > some subset of brokers. Though we haven't run any performance tests that > would use this configuration, I would think it will help scale the number > of > topics beyond 20K. > > I will look into this. > Secondly, you could use separate Kafka clusters for some of the topics, > effectively sharding the data by topics across several Kafka clusters. From > what you explained, looks like you are using this approach and want the > proxy to pick the right cluster for a particular topic ? I guess, you could > also push that logic one level up to the application using the Kafka > producer ? > > That's what I decided to do - because I want a generic "pub/sub" api, the sharding logic will go there, and it will instantiate n kafka connections where n is the # of shards (each shard being a separate kafka cluster). > Lastly, I was curious about the performance tests you ran to conclude that > a > comfortable number of live topics is around 20K. Wonder how the performance > degraded in terms of producer throughput ? Also, in these tests, how many > concurrent consumers were you running ? How was the consumer throughput > affected ? > > Just measured the amount of time it takes to create that many topics. The time began to increase non-linearly around that level - it was maybe 30k or 40k, and there were random errors that seemed to be happening at the filesystem level, for example file not found would be generated. So we settled on 20k as a safe number. Of course we had to increase the number of open files in the kernel and process and so on to get to 20k. > Thanks, > Neha > > On Thu, Sep 15, 2011 at 1:41 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Great news! Let us know how we can help you out as you go live! > > > > WRT adding a correlation id, this is an interesting idea. We discussed > this > > early on, but the argument was that we wanted to add replication and give > > guarantees about ordering from the point of view of the client which > > doesn't > > really make sense in the case where the client itself multiplexes > requests > > (e.g. we want to guarantee that two sequential produce requests from > single > > producer on the same partition are delivered in that order). Without this > > guarantee maintaining a log which contains record updates is hard, since > > two > > updates could be reordered and give the wrong result. This argument led > us > > to a fundamentally blocking client. However I could easily see there > being > > a > > case where this is not what you want and multiplexing would be an > advantage > > then if it didn't add too much complexity. > > > > -Jay > > > > On Thu, Sep 15, 2011 at 1:01 PM, Taylor Gautier <tgaut...@tagged.com> > > wrote: > > > > > Hi, > > > > > > I've been looking at Kafka for a while now, and am getting ready to > start > > > moving an implementation we have been working on into production. > > > > > > Previously, I asked about support for large numbers of topics per kafka > > > instance, and was not surprised to learn it would have a number of > > > limitations such as the filesystem itself and zookeeper. Through > testing > > > we've determined a comfortable number of live topics at around 20k. In > > the > > > future, I'd like to start digging into the Kafka internals to see if > this > > > number can't be increased…however in the short term we will be going > with > > > an > > > unmodified version of Kafka which we will build some additional > > > infrastructure to support our needs. Thus I have the following > questions > > > regarding the protocol… > > > > > > Our use case requires greater than 20k topics system-wide, thus I am > > > looking > > > to shard multiple instances of Kafka sharding by topic name and using a > > > consistent hashing scheme to pick the correct physical instance. This > > > poses > > > a challenge, however. The routing to each sharded kafka instance has > to > > be > > > done somewhere, either in the client driver, above the client driver, > or > > as > > > a proxy in between. > > > > > > I've chosen to go with a proxy in between, as it centralizes the > sharding > > > knowledge into a single manageable tier. However, the design of the > > kafka > > > protocol makes this very difficult to implement - here's why. > > > > > > While the kafka protocol supports pipelining, it doesn't support > > > out-of-order responses because the responses do not indicate for which > > > topic > > > they are a response to. > > > > > > Therefore, to implement a proxy I would have no choice but to buffer > > > responses from different sharded instances and re-order them into the > > > client > > > issued order, or else the client would be confused. > > > > > > I propose a modification to the protocol to allow the client to receive > > > responses out of order. In the proxy use case, this would eliminate > the > > > need to buffer and re-order responses and allow the proxy to be mostly > a > > > request/response i/o router. > > > > > > Also, on the kafka side, this would also allow the kafka server to > > service > > > requests out of order, which could, for some use cases, improve > response > > > times, although for that to be actually realized, I suspect the kafka > > code > > > might have to support storing topics on multiple disk mount points and > we > > > would have to assume that Java NIO could handle true async disk > requests > > to > > > multiple disk subsystems. It probably does but I've never pushed Java > > > async > > > disk i/o that far…maybe you have? > > > > > > So…the point of my long-winded post is - I propose a modification to > the > > > protocol to implement out of order responses. This could be > implemented > > in > > > one of two ways - either embed the topic name in the response header, > or, > > > maybe preferably, force the client to issue a 64-bit token for each > > > request, > > > and the response can include that token in the response (the latter > > > proposal > > > makes the parsing easier, and shortens the response without unduly > > > burdening > > > the implementations with too much complexity). > > > > > > As I said, we are moving forward with a proxy based solution and it > will > > > most likely take the existing protocol and modify it in this way. What > > do > > > you think of this effort? > > > > > > Thanks! > > > > > > Taylor > > > > > >