Regarding the poducer/zk connection: if I am using zk to discover the kafka cluster, doesn't the producer get updates if zk's knowledge of the cluster changes? Or does it only reconsult zk if the particular kafka node it was "getting metadata" from goes away? Should I not be using a "zk.connect" but instead a "broker.list" when using a producer (that would seem restrictive)? What I've noticed is that the instant the zk server is taken down, my producer immediately starts logging connection errors to zk, every second, and never stops this logging until zk comes back. So it certainly feels like the producer is attempting to maintain a direct connection to zk. I suppose I expected it to try for the connection timeout period (e.g. 6000ms), and then give up, until the next send request, etc.
Perhaps what it should do is make that initial zk connection to find the kafka broker list, then shutdown the zk connection if it really doesn't need it after that, until possibly recreating it if needed if it can no longer make contact with the kafka cluster. For the async queuing behavior, I agree, it's difficult to respond to a send request with an exception, when the sending is done asynchronously, in a different thread. However, this is the behavior when the producer is started initially, with no zk available (e.g. producer.send() gets an exception). So, the api is inconsistent, in that it treats the unavailability of zk differently, depending on whether it was unavailable at the initial startup, vs. a subsequent zk outage after previously having been available. I am not too concerned about not having 100% guarantee that if I successfully call producer.send(), that I know it was actually delivered. But it would be nice to have some way to know the current health of the producer, perhaps some sort of "producerStatus()" method. If the async sending thread is having issues sending, it might be nice to expose that to the client. Also, if the current producerStatus() is not healthy, then I think it might be ok to not accept new messages to be sent (e.g. producer.send() could throw an Exception in that case). Returning a Future for each message sent seems a bit unscalable.....I'm not sure clients want to be tying up resources waiting for Futures all the time either. I'm also seeing that if kafka goes down, while zk stays up, subsequent calls to producer.send() fail immediately with an exception ("partition is null"). I think this makes sense, although, in that case, what is the fate of previously buffered but unsent messages? Are they all lost? But I'd like it if zk goes down, and then kafka goes down, it would behave the same way as if only kafka went down. Instead, it continues happily buffering messages, with lots of zk connection errors logged, but no way for the client code to know that things are not hunky dory. In summary: 1. If zk connection is not important for a producer, why continually log zk connection errors every second, while at the same time have the client behave as if nothing is wrong and just keep accepting messages. 2. if zk connection goes down, followed by kafka going down, it behaves no differently than if only zk went down, from the client's perspective (it keeps accepting new messages). 3. if zk connection stays up, but kafka goes down, it then fails immediately with an exception in a call to producer.send (I think this makes sense). 4. we have no way of knowing if/when buffered messages are sent, once zk and/or kafka come back online (although it appears all buffered messages are lost in any case where kafka goes offline). 5. I'm not clear on the difference between "queue.time" and "queue.size". If kafka is not available, but "queue.time" has expired, what happens, do messages get dropped, or do they continue to be buffered until queue.size is exhausted? 6. What happens if I call producer.close(), while there are buffered messages. Do the messages get sent before the producer shuts down? Jason On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > Starting in 0.8 there is no direct connection from the producer to zk. The > goal here is to make it easy to implement clients in non-java languages and > avoid painful zk upgrades. ZK is replaced by a "get_metadata" api (which in > terms of implementation, of course still just reads from zk--but now the > read is done by the server). I think the intended behavior for the async producer is the following: > 1. Messages are immediately added to a queue of messages to be sent > 2. The number of messages that can be in the queue is bounded > by queue.size. If the queue is full the parameter > queue.enqueueTimeout.msdetermines how long the producer will wait for > the queue length to decrease > before dropping the message > 3. Messages are sent by a background thread. If this thread falls behind > due to throughput or because the kafka cluster is down messages will pile > up in the queue. > 4. If the send operation fails there are two options (1) retry, (2) give > up. If you retry you may get a duplicate (this is the semantics of any > mutation RPC in any system--e.g. if you get a network error you do not know > if the mutation has occurred or not). If you give up you will lose those > messages. This decision is controlled by producer.num.retries. > > It is desirable that the client not die if the zk connection is lost, if > possible, since zk sometimes can have gc pauses or disk latency or whatever > other transient issue. > > Because the send is async it is impossible and does not have a 1-1 > correspondence to the network communication it is not possible to > immediately throw an exception in the send() call. > > This is not ideal since how do you know if your send succeeded? We think > the fix is to have the send call return a future representing the result of > the request that will eventually be made as well as returning the offset of > your message if you care. This is a bit of a large refactoring of the > producer code (which could definitely use some refactoring) so our > tentative plan was to address it in 0.9. > > I think what you are saying is slightly different, though, which is that > the behavior between the various cases should be consistent. What do you > think would be the right behavior? > > -Jay > > > > > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <j...@squareup.com> wrote: > > > I forgot to mention, that I'm working with a recent version of the 0.8 > code > > (Last chaned rev: 1396425). > > > > Jason > > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg <j...@squareup.com> > wrote: > > > > > I've been doing some testing, with an async producer. > > > > > > It seems, if I start up the producer, with no zk cluster present, it > does > > > what I expect, that is it waits for a limited time looking for the zk > > > cluster, and then gives up after the zk.connectiontimeout.ms setting > > > (6000ms, by default), and fails to send a message. However, if after > > > starting up zk and having a good connection to zk and kafka, I then > > > shutdown the zk cluster, the producer never seems to stop accepting > > > messages to send. > > > > > > As long as kafka stays up and running, even without zk still available, > > my > > > producer sends messages and my consumer can consume them. > > > > > > However, if I then stop kafka also, my producer happily keeps on > > accepting > > > messages without failing in a call to producer.send(). It's clearly no > > > longer able to send any messages at this point. So, I assume it > > eventually > > > will just start dropping messages on the floor? > > > > > > I would have expected that once both zk/kafka are not available, things > > > should revert to the initial startup case, where it tries for 6000ms > and > > > then throws an exception on send. > > > > > > Thoughts? > > > > > > What's the expected behavior for async producers, when the async > buffered > > > messages can't be sent. I think it's fine if they are just lost, but > > > should it be possible to block further accepting of messages once the > > > system has detected a problem communicating with zk/kafka? > > > > > > Also, if I cleanly shutdown an async producer (e.g. call > > > producer.close()), should it make a best effort to send out any > buffered > > > messages before shutting down? Or will it quit immediately dropping > any > > > buffered messages on the floor? > > > > > > Jason > > > > > >