Starting in 0.8 there is no direct connection from the producer to zk. The goal here is to make it easy to implement clients in non-java languages and avoid painful zk upgrades. ZK is replaced by a "get_metadata" api (which in terms of implementation, of course still just reads from zk--but now the read is done by the server).
I think the intended behavior for the async producer is the following: 1. Messages are immediately added to a queue of messages to be sent 2. The number of messages that can be in the queue is bounded by queue.size. If the queue is full the parameter queue.enqueueTimeout.msdetermines how long the producer will wait for the queue length to decrease before dropping the message 3. Messages are sent by a background thread. If this thread falls behind due to throughput or because the kafka cluster is down messages will pile up in the queue. 4. If the send operation fails there are two options (1) retry, (2) give up. If you retry you may get a duplicate (this is the semantics of any mutation RPC in any system--e.g. if you get a network error you do not know if the mutation has occurred or not). If you give up you will lose those messages. This decision is controlled by producer.num.retries. It is desirable that the client not die if the zk connection is lost, if possible, since zk sometimes can have gc pauses or disk latency or whatever other transient issue. Because the send is async it is impossible and does not have a 1-1 correspondence to the network communication it is not possible to immediately throw an exception in the send() call. This is not ideal since how do you know if your send succeeded? We think the fix is to have the send call return a future representing the result of the request that will eventually be made as well as returning the offset of your message if you care. This is a bit of a large refactoring of the producer code (which could definitely use some refactoring) so our tentative plan was to address it in 0.9. I think what you are saying is slightly different, though, which is that the behavior between the various cases should be consistent. What do you think would be the right behavior? -Jay On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <j...@squareup.com> wrote: > I forgot to mention, that I'm working with a recent version of the 0.8 code > (Last chaned rev: 1396425). > > Jason > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg <j...@squareup.com> wrote: > > > I've been doing some testing, with an async producer. > > > > It seems, if I start up the producer, with no zk cluster present, it does > > what I expect, that is it waits for a limited time looking for the zk > > cluster, and then gives up after the zk.connectiontimeout.ms setting > > (6000ms, by default), and fails to send a message. However, if after > > starting up zk and having a good connection to zk and kafka, I then > > shutdown the zk cluster, the producer never seems to stop accepting > > messages to send. > > > > As long as kafka stays up and running, even without zk still available, > my > > producer sends messages and my consumer can consume them. > > > > However, if I then stop kafka also, my producer happily keeps on > accepting > > messages without failing in a call to producer.send(). It's clearly no > > longer able to send any messages at this point. So, I assume it > eventually > > will just start dropping messages on the floor? > > > > I would have expected that once both zk/kafka are not available, things > > should revert to the initial startup case, where it tries for 6000ms and > > then throws an exception on send. > > > > Thoughts? > > > > What's the expected behavior for async producers, when the async buffered > > messages can't be sent. I think it's fine if they are just lost, but > > should it be possible to block further accepting of messages once the > > system has detected a problem communicating with zk/kafka? > > > > Also, if I cleanly shutdown an async producer (e.g. call > > producer.close()), should it make a best effort to send out any buffered > > messages before shutting down? Or will it quit immediately dropping any > > buffered messages on the floor? > > > > Jason > > >