Trunk does not have latest 0.8 code yet. We plan to merge 0.8 back into trunk soon, but it hasn't happened yet
Typically, the number of producers to a production Kafka clusters is very large, which means large number of connections to zookeeper. If there is a slight blip on the zookeeper cluster due to network error, disk latency or GC, this can cause a lot of churn as zookeeper will now try to expire ~10s of thousands of zk sessions. Basically, you want zookeeper on the producer to do just one thing - notify the change in the liveness of brokers in Kafka cluster. In 0.8, brokers are not the entity to worry about, what we care about are replicas for the partitions that the producer is sending data to, in particular just the leader replica (since only the leader can accept writes for a partition) The producer keeps a cache of (topic, partition) -> leader-replica. Now, if that cache is either empty or stale (due to changes on the Kafka cluster), the next produce request will get an ACK with an error code NotLeaderForPartition. That's when it fires the getMetadata request that refreshes its cache. Assuming you've configured your producer to rety (producer.num.retries) more than once, it will succeed sending data the next time. In other words, instead of zookeeper 'notifying' us of the changes on the Kafka cluster, we let the producer lazily update its cache by invoking a special API on any of the Kafka brokers. That way, we have much fewer connections to zk, zk upgrades are easier, so are upgrades to the producer and we also achieve the goal of replica discovery. As for the asymmetry between producers and consumers, we have a proposal and some initial code written to address that in a future release - https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design Thanks, Neha On Tue, Nov 20, 2012 at 7:57 AM, Jason Rosenberg <j...@squareup.com> wrote: > I checked out trunk. I guess I assumed that included the latest 0.8. Is > that not right? Am I just looking at 0.7.x+? > > Honestly, I don't think it would be a positive thing not to be able to rely > on zookeeper in producer code. How does that affect the discovery of a > kafka cluster under dynamic conditions? We'd expect to have a much higher > SLA for the zookeeper cluster than for kafka. We'd like to be able to > freely do rolling restarts of the kafka cluster, etc. > > Also, it seems a bit asymetric to use zk for the kafka brokers and > consumers, but not the producers. > > Jason > > On Mon, Nov 19, 2012 at 8:50 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> In 0.8 there is no way to use zookeeper from the producer and no connection >> from the client. There isn't even a way to configure a zk connection. Are >> you sure you checked out the 0.8 branch? >> >> Check the code you've got: >> *jkreps-mn:kafka-0.8 jkreps$ svn info* >> *Path: .* >> *URL: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8* >> *Repository Root: https://svn.apache.org/repos/asf* >> >> The key is that it should have come from the URL kafka/branches/0.8. >> >> -Jay >> >> >> On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg <j...@squareup.com> wrote: >> >> > Regarding the poducer/zk connection: if I am using zk to discover the >> > kafka cluster, doesn't the producer get updates if zk's knowledge of the >> > cluster changes? Or does it only reconsult zk if the particular kafka >> node >> > it was "getting metadata" from goes away? Should I not be using a >> > "zk.connect" but instead a "broker.list" when using a producer (that >> would >> > seem restrictive)? What I've noticed is that the instant the zk server >> is >> > taken down, my producer immediately starts logging connection errors to >> zk, >> > every second, and never stops this logging until zk comes back. So it >> > certainly feels like the producer is attempting to maintain a direct >> > connection to zk. I suppose I expected it to try for the connection >> > timeout period (e.g. 6000ms), and then give up, until the next send >> > request, etc. >> > >> > Perhaps what it should do is make that initial zk connection to find the >> > kafka broker list, then shutdown the zk connection if it really doesn't >> > need it after that, until possibly recreating it if needed if it can no >> > longer make contact with the kafka cluster. >> > >> > For the async queuing behavior, I agree, it's difficult to respond to a >> > send request with an exception, when the sending is done asynchronously, >> in >> > a different thread. However, this is the behavior when the producer is >> > started initially, with no zk available (e.g. producer.send() gets an >> > exception). So, the api is inconsistent, in that it treats the >> > unavailability of zk differently, depending on whether it was unavailable >> > at the initial startup, vs. a subsequent zk outage after previously >> having >> > been available. >> > >> > I am not too concerned about not having 100% guarantee that if I >> > successfully call producer.send(), that I know it was actually delivered. >> > But it would be nice to have some way to know the current health of the >> > producer, perhaps some sort of "producerStatus()" method. If the async >> > sending thread is having issues sending, it might be nice to expose that >> to >> > the client. Also, if the current producerStatus() is not healthy, then I >> > think it might be ok to not accept new messages to be sent (e.g. >> > producer.send() could throw an Exception in that case). >> > >> > Returning a Future for each message sent seems a bit unscalable.....I'm >> not >> > sure clients want to be tying up resources waiting for Futures all the >> time >> > either. >> > >> > I'm also seeing that if kafka goes down, while zk stays up, subsequent >> > calls to producer.send() fail immediately with an exception ("partition >> is >> > null"). I think this makes sense, although, in that case, what is the >> fate >> > of previously buffered but unsent messages? Are they all lost? >> > >> > But I'd like it if zk goes down, and then kafka goes down, it would >> behave >> > the same way as if only kafka went down. Instead, it continues happily >> > buffering messages, with lots of zk connection errors logged, but no way >> > for the client code to know that things are not hunky dory. >> > >> > In summary: >> > >> > 1. If zk connection is not important for a producer, why continually log >> zk >> > connection errors every second, while at the same time have the client >> > behave as if nothing is wrong and just keep accepting messages. >> > 2. if zk connection goes down, followed by kafka going down, it behaves >> no >> > differently than if only zk went down, from the client's perspective (it >> > keeps accepting new messages). >> > 3. if zk connection stays up, but kafka goes down, it then fails >> > immediately with an exception in a call to producer.send (I think this >> > makes sense). >> > 4. we have no way of knowing if/when buffered messages are sent, once zk >> > and/or kafka come back online (although it appears all buffered messages >> > are lost in any case where kafka goes offline). >> > 5. I'm not clear on the difference between "queue.time" and "queue.size". >> > If kafka is not available, but "queue.time" has expired, what happens, >> do >> > messages get dropped, or do they continue to be buffered until queue.size >> > is exhausted? >> > 6. What happens if I call producer.close(), while there are buffered >> > messages. Do the messages get sent before the producer shuts down? >> > >> > Jason >> > >> > >> > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> > >> > > Starting in 0.8 there is no direct connection from the producer to zk. >> > The >> > > goal here is to make it easy to implement clients in non-java languages >> > and >> > > avoid painful zk upgrades. ZK is replaced by a "get_metadata" api >> (which >> > in >> > > terms of implementation, of course still just reads from zk--but now >> the >> > > read is done by the server). >> > >> > I think the intended behavior for the async producer is the following: >> > > 1. Messages are immediately added to a queue of messages to be sent >> > > 2. The number of messages that can be in the queue is bounded >> > > by queue.size. If the queue is full the parameter >> > > queue.enqueueTimeout.msdetermines how long the producer will wait for >> > > the queue length to decrease >> > > before dropping the message >> > > 3. Messages are sent by a background thread. If this thread falls >> behind >> > > due to throughput or because the kafka cluster is down messages will >> pile >> > > up in the queue. >> > > 4. If the send operation fails there are two options (1) retry, (2) >> give >> > > up. If you retry you may get a duplicate (this is the semantics of any >> > > mutation RPC in any system--e.g. if you get a network error you do not >> > know >> > > if the mutation has occurred or not). If you give up you will lose >> those >> > > messages. This decision is controlled by producer.num.retries. >> > > >> > > It is desirable that the client not die if the zk connection is lost, >> if >> > > possible, since zk sometimes can have gc pauses or disk latency or >> > whatever >> > > other transient issue. >> > > >> > > Because the send is async it is impossible and does not have a 1-1 >> > > correspondence to the network communication it is not possible to >> > > immediately throw an exception in the send() call. >> > > >> > > This is not ideal since how do you know if your send succeeded? We >> think >> > > the fix is to have the send call return a future representing the >> result >> > of >> > > the request that will eventually be made as well as returning the >> offset >> > of >> > > your message if you care. This is a bit of a large refactoring of the >> > > producer code (which could definitely use some refactoring) so our >> > > tentative plan was to address it in 0.9. >> > > >> > > I think what you are saying is slightly different, though, which is >> that >> > > the behavior between the various cases should be consistent. What do >> you >> > > think would be the right behavior? >> > > >> > > -Jay >> > > >> > > >> > > >> > > >> > > >> > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <j...@squareup.com> >> > wrote: >> > > >> > > > I forgot to mention, that I'm working with a recent version of the >> 0.8 >> > > code >> > > > (Last chaned rev: 1396425). >> > > > >> > > > Jason >> > > > >> > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg <j...@squareup.com> >> > > wrote: >> > > > >> > > > > I've been doing some testing, with an async producer. >> > > > > >> > > > > It seems, if I start up the producer, with no zk cluster present, >> it >> > > does >> > > > > what I expect, that is it waits for a limited time looking for the >> zk >> > > > > cluster, and then gives up after the zk.connectiontimeout.mssetting >> > > > > (6000ms, by default), and fails to send a message. However, if >> after >> > > > > starting up zk and having a good connection to zk and kafka, I then >> > > > > shutdown the zk cluster, the producer never seems to stop accepting >> > > > > messages to send. >> > > > > >> > > > > As long as kafka stays up and running, even without zk still >> > available, >> > > > my >> > > > > producer sends messages and my consumer can consume them. >> > > > > >> > > > > However, if I then stop kafka also, my producer happily keeps on >> > > > accepting >> > > > > messages without failing in a call to producer.send(). It's >> clearly >> > no >> > > > > longer able to send any messages at this point. So, I assume it >> > > > eventually >> > > > > will just start dropping messages on the floor? >> > > > > >> > > > > I would have expected that once both zk/kafka are not available, >> > things >> > > > > should revert to the initial startup case, where it tries for >> 6000ms >> > > and >> > > > > then throws an exception on send. >> > > > > >> > > > > Thoughts? >> > > > > >> > > > > What's the expected behavior for async producers, when the async >> > > buffered >> > > > > messages can't be sent. I think it's fine if they are just lost, >> but >> > > > > should it be possible to block further accepting of messages once >> the >> > > > > system has detected a problem communicating with zk/kafka? >> > > > > >> > > > > Also, if I cleanly shutdown an async producer (e.g. call >> > > > > producer.close()), should it make a best effort to send out any >> > > buffered >> > > > > messages before shutting down? Or will it quit immediately >> dropping >> > > any >> > > > > buffered messages on the floor? >> > > > > >> > > > > Jason >> > > > > >> > > > >> > > >> > >>