In 0.8 there is no way to use zookeeper from the producer and no connection from the client. There isn't even a way to configure a zk connection. Are you sure you checked out the 0.8 branch?
Check the code you've got: *jkreps-mn:kafka-0.8 jkreps$ svn info* *Path: .* *URL: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8* *Repository Root: https://svn.apache.org/repos/asf* The key is that it should have come from the URL kafka/branches/0.8. -Jay On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg <j...@squareup.com> wrote: > Regarding the poducer/zk connection: if I am using zk to discover the > kafka cluster, doesn't the producer get updates if zk's knowledge of the > cluster changes? Or does it only reconsult zk if the particular kafka node > it was "getting metadata" from goes away? Should I not be using a > "zk.connect" but instead a "broker.list" when using a producer (that would > seem restrictive)? What I've noticed is that the instant the zk server is > taken down, my producer immediately starts logging connection errors to zk, > every second, and never stops this logging until zk comes back. So it > certainly feels like the producer is attempting to maintain a direct > connection to zk. I suppose I expected it to try for the connection > timeout period (e.g. 6000ms), and then give up, until the next send > request, etc. > > Perhaps what it should do is make that initial zk connection to find the > kafka broker list, then shutdown the zk connection if it really doesn't > need it after that, until possibly recreating it if needed if it can no > longer make contact with the kafka cluster. > > For the async queuing behavior, I agree, it's difficult to respond to a > send request with an exception, when the sending is done asynchronously, in > a different thread. However, this is the behavior when the producer is > started initially, with no zk available (e.g. producer.send() gets an > exception). So, the api is inconsistent, in that it treats the > unavailability of zk differently, depending on whether it was unavailable > at the initial startup, vs. a subsequent zk outage after previously having > been available. > > I am not too concerned about not having 100% guarantee that if I > successfully call producer.send(), that I know it was actually delivered. > But it would be nice to have some way to know the current health of the > producer, perhaps some sort of "producerStatus()" method. If the async > sending thread is having issues sending, it might be nice to expose that to > the client. Also, if the current producerStatus() is not healthy, then I > think it might be ok to not accept new messages to be sent (e.g. > producer.send() could throw an Exception in that case). > > Returning a Future for each message sent seems a bit unscalable.....I'm not > sure clients want to be tying up resources waiting for Futures all the time > either. > > I'm also seeing that if kafka goes down, while zk stays up, subsequent > calls to producer.send() fail immediately with an exception ("partition is > null"). I think this makes sense, although, in that case, what is the fate > of previously buffered but unsent messages? Are they all lost? > > But I'd like it if zk goes down, and then kafka goes down, it would behave > the same way as if only kafka went down. Instead, it continues happily > buffering messages, with lots of zk connection errors logged, but no way > for the client code to know that things are not hunky dory. > > In summary: > > 1. If zk connection is not important for a producer, why continually log zk > connection errors every second, while at the same time have the client > behave as if nothing is wrong and just keep accepting messages. > 2. if zk connection goes down, followed by kafka going down, it behaves no > differently than if only zk went down, from the client's perspective (it > keeps accepting new messages). > 3. if zk connection stays up, but kafka goes down, it then fails > immediately with an exception in a call to producer.send (I think this > makes sense). > 4. we have no way of knowing if/when buffered messages are sent, once zk > and/or kafka come back online (although it appears all buffered messages > are lost in any case where kafka goes offline). > 5. I'm not clear on the difference between "queue.time" and "queue.size". > If kafka is not available, but "queue.time" has expired, what happens, do > messages get dropped, or do they continue to be buffered until queue.size > is exhausted? > 6. What happens if I call producer.close(), while there are buffered > messages. Do the messages get sent before the producer shuts down? > > Jason > > > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Starting in 0.8 there is no direct connection from the producer to zk. > The > > goal here is to make it easy to implement clients in non-java languages > and > > avoid painful zk upgrades. ZK is replaced by a "get_metadata" api (which > in > > terms of implementation, of course still just reads from zk--but now the > > read is done by the server). > > I think the intended behavior for the async producer is the following: > > 1. Messages are immediately added to a queue of messages to be sent > > 2. The number of messages that can be in the queue is bounded > > by queue.size. If the queue is full the parameter > > queue.enqueueTimeout.msdetermines how long the producer will wait for > > the queue length to decrease > > before dropping the message > > 3. Messages are sent by a background thread. If this thread falls behind > > due to throughput or because the kafka cluster is down messages will pile > > up in the queue. > > 4. If the send operation fails there are two options (1) retry, (2) give > > up. If you retry you may get a duplicate (this is the semantics of any > > mutation RPC in any system--e.g. if you get a network error you do not > know > > if the mutation has occurred or not). If you give up you will lose those > > messages. This decision is controlled by producer.num.retries. > > > > It is desirable that the client not die if the zk connection is lost, if > > possible, since zk sometimes can have gc pauses or disk latency or > whatever > > other transient issue. > > > > Because the send is async it is impossible and does not have a 1-1 > > correspondence to the network communication it is not possible to > > immediately throw an exception in the send() call. > > > > This is not ideal since how do you know if your send succeeded? We think > > the fix is to have the send call return a future representing the result > of > > the request that will eventually be made as well as returning the offset > of > > your message if you care. This is a bit of a large refactoring of the > > producer code (which could definitely use some refactoring) so our > > tentative plan was to address it in 0.9. > > > > I think what you are saying is slightly different, though, which is that > > the behavior between the various cases should be consistent. What do you > > think would be the right behavior? > > > > -Jay > > > > > > > > > > > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <j...@squareup.com> > wrote: > > > > > I forgot to mention, that I'm working with a recent version of the 0.8 > > code > > > (Last chaned rev: 1396425). > > > > > > Jason > > > > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg <j...@squareup.com> > > wrote: > > > > > > > I've been doing some testing, with an async producer. > > > > > > > > It seems, if I start up the producer, with no zk cluster present, it > > does > > > > what I expect, that is it waits for a limited time looking for the zk > > > > cluster, and then gives up after the zk.connectiontimeout.ms setting > > > > (6000ms, by default), and fails to send a message. However, if after > > > > starting up zk and having a good connection to zk and kafka, I then > > > > shutdown the zk cluster, the producer never seems to stop accepting > > > > messages to send. > > > > > > > > As long as kafka stays up and running, even without zk still > available, > > > my > > > > producer sends messages and my consumer can consume them. > > > > > > > > However, if I then stop kafka also, my producer happily keeps on > > > accepting > > > > messages without failing in a call to producer.send(). It's clearly > no > > > > longer able to send any messages at this point. So, I assume it > > > eventually > > > > will just start dropping messages on the floor? > > > > > > > > I would have expected that once both zk/kafka are not available, > things > > > > should revert to the initial startup case, where it tries for 6000ms > > and > > > > then throws an exception on send. > > > > > > > > Thoughts? > > > > > > > > What's the expected behavior for async producers, when the async > > buffered > > > > messages can't be sent. I think it's fine if they are just lost, but > > > > should it be possible to block further accepting of messages once the > > > > system has detected a problem communicating with zk/kafka? > > > > > > > > Also, if I cleanly shutdown an async producer (e.g. call > > > > producer.close()), should it make a best effort to send out any > > buffered > > > > messages before shutting down? Or will it quit immediately dropping > > any > > > > buffered messages on the floor? > > > > > > > > Jason > > > > > > > > > >