Hi Taylor, I'm trying to understand why you need this proxy approach to send data to the brokers.
>> Therefore, to implement a proxy I would have no choice but to buffer responses from different sharded instances and re-order them into the client issued order, or else the client would be confused. Today, the brokers do not send any ACK for produce requests. So, there are no responses to worry about, right ? >> Our use case requires greater than 20k topics system-wide, thus I am looking to shard multiple instances of Kafka sharding by topic name and using a consistent hashing scheme to pick the correct physical instance. To scale the number of topics, there are a couple of things you could do. Firstly, all topics do not need to be stored on all the brokers. Today, the default behavior of Kafka is to scatter the data for a topic on ALL brokers in the cluster. That surely limits the number of topics you can host in one cluster. But in 0.6, we released the dynamic partitioning feature on the producer. Using some custom partitioner, you could easily host topics on some subset of brokers. Though we haven't run any performance tests that would use this configuration, I would think it will help scale the number of topics beyond 20K. Secondly, you could use separate Kafka clusters for some of the topics, effectively sharding the data by topics across several Kafka clusters. From what you explained, looks like you are using this approach and want the proxy to pick the right cluster for a particular topic ? I guess, you could also push that logic one level up to the application using the Kafka producer ? Lastly, I was curious about the performance tests you ran to conclude that a comfortable number of live topics is around 20K. Wonder how the performance degraded in terms of producer throughput ? Also, in these tests, how many concurrent consumers were you running ? How was the consumer throughput affected ? Thanks, Neha On Thu, Sep 15, 2011 at 1:41 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > Great news! Let us know how we can help you out as you go live! > > WRT adding a correlation id, this is an interesting idea. We discussed this > early on, but the argument was that we wanted to add replication and give > guarantees about ordering from the point of view of the client which > doesn't > really make sense in the case where the client itself multiplexes requests > (e.g. we want to guarantee that two sequential produce requests from single > producer on the same partition are delivered in that order). Without this > guarantee maintaining a log which contains record updates is hard, since > two > updates could be reordered and give the wrong result. This argument led us > to a fundamentally blocking client. However I could easily see there being > a > case where this is not what you want and multiplexing would be an advantage > then if it didn't add too much complexity. > > -Jay > > On Thu, Sep 15, 2011 at 1:01 PM, Taylor Gautier <tgaut...@tagged.com> > wrote: > > > Hi, > > > > I've been looking at Kafka for a while now, and am getting ready to start > > moving an implementation we have been working on into production. > > > > Previously, I asked about support for large numbers of topics per kafka > > instance, and was not surprised to learn it would have a number of > > limitations such as the filesystem itself and zookeeper. Through testing > > we've determined a comfortable number of live topics at around 20k. In > the > > future, I'd like to start digging into the Kafka internals to see if this > > number can't be increased…however in the short term we will be going with > > an > > unmodified version of Kafka which we will build some additional > > infrastructure to support our needs. Thus I have the following questions > > regarding the protocol… > > > > Our use case requires greater than 20k topics system-wide, thus I am > > looking > > to shard multiple instances of Kafka sharding by topic name and using a > > consistent hashing scheme to pick the correct physical instance. This > > poses > > a challenge, however. The routing to each sharded kafka instance has to > be > > done somewhere, either in the client driver, above the client driver, or > as > > a proxy in between. > > > > I've chosen to go with a proxy in between, as it centralizes the sharding > > knowledge into a single manageable tier. However, the design of the > kafka > > protocol makes this very difficult to implement - here's why. > > > > While the kafka protocol supports pipelining, it doesn't support > > out-of-order responses because the responses do not indicate for which > > topic > > they are a response to. > > > > Therefore, to implement a proxy I would have no choice but to buffer > > responses from different sharded instances and re-order them into the > > client > > issued order, or else the client would be confused. > > > > I propose a modification to the protocol to allow the client to receive > > responses out of order. In the proxy use case, this would eliminate the > > need to buffer and re-order responses and allow the proxy to be mostly a > > request/response i/o router. > > > > Also, on the kafka side, this would also allow the kafka server to > service > > requests out of order, which could, for some use cases, improve response > > times, although for that to be actually realized, I suspect the kafka > code > > might have to support storing topics on multiple disk mount points and we > > would have to assume that Java NIO could handle true async disk requests > to > > multiple disk subsystems. It probably does but I've never pushed Java > > async > > disk i/o that far…maybe you have? > > > > So…the point of my long-winded post is - I propose a modification to the > > protocol to implement out of order responses. This could be implemented > in > > one of two ways - either embed the topic name in the response header, or, > > maybe preferably, force the client to issue a 64-bit token for each > > request, > > and the response can include that token in the response (the latter > > proposal > > makes the parsing easier, and shortens the response without unduly > > burdening > > the implementations with too much complexity). > > > > As I said, we are moving forward with a proxy based solution and it will > > most likely take the existing protocol and modify it in this way. What > do > > you think of this effort? > > > > Thanks! > > > > Taylor > > >