Ok, So, I'm still wrapping my mind around this. I liked being able to use zk for all clients, since it made it very easy to think about how to update the kafka cluster. E.g. how to add new brokers, how to move them all to new hosts entirely, etc., without having to redeploy all the clients. The new brokers will simply advertise their new location via zk, and all clients will pick it up.
By requiring use of a configured broker.list for each client, means that 1000's of deployed apps need to be updated any time the kafka cluster changes, no? (Or am I not understanding?). You mention that auto-discovery of new brokers will still work, is that dependent on the existing configured broker.list set still being available also? I can see though how this will greatly reduce the load on zookeeper. Jason On Tue, Nov 20, 2012 at 9:03 AM, Jun Rao <jun...@gmail.com> wrote: > Jason, > > Auto discovery of new brokers and rolling restart of brokers are still > supported in 0.8. It's just that most of the ZK related logic is moved to > the broker. > > There are 2 reasons why we want to remove zkclient from the client. > > 1. If the client goes to GC, it can cause zk session expiration and cause > churns in the client and extra load on the zk server. > 2. This simplifies the client code and makes the implementation of non-java > clients easier. > > In 0.8, we removed the zk dependency from the producer. Post 0.8, we plan > to see if we can do the same thing for the consumer (though more involved). > This shouldn't reduce any existing functionality of the client though. Feel > free to let us know if you still have concerns. > > Thanks, > > Jun > > > > On Tue, Nov 20, 2012 at 7:57 AM, Jason Rosenberg <j...@squareup.com> wrote: > > > I checked out trunk. I guess I assumed that included the latest 0.8. Is > > that not right? Am I just looking at 0.7.x+? > > > > Honestly, I don't think it would be a positive thing not to be able to > rely > > on zookeeper in producer code. How does that affect the discovery of a > > kafka cluster under dynamic conditions? We'd expect to have a much > higher > > SLA for the zookeeper cluster than for kafka. We'd like to be able to > > freely do rolling restarts of the kafka cluster, etc. > > > > Also, it seems a bit asymetric to use zk for the kafka brokers and > > consumers, but not the producers. > > > > Jason > > > > On Mon, Nov 19, 2012 at 8:50 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > In 0.8 there is no way to use zookeeper from the producer and no > > connection > > > from the client. There isn't even a way to configure a zk connection. > Are > > > you sure you checked out the 0.8 branch? > > > > > > Check the code you've got: > > > *jkreps-mn:kafka-0.8 jkreps$ svn info* > > > *Path: .* > > > *URL: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8* > > > *Repository Root: https://svn.apache.org/repos/asf* > > > > > > The key is that it should have come from the URL kafka/branches/0.8. > > > > > > -Jay > > > > > > > > > On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg <j...@squareup.com> > > wrote: > > > > > > > Regarding the poducer/zk connection: if I am using zk to discover > the > > > > kafka cluster, doesn't the producer get updates if zk's knowledge of > > the > > > > cluster changes? Or does it only reconsult zk if the particular > kafka > > > node > > > > it was "getting metadata" from goes away? Should I not be using a > > > > "zk.connect" but instead a "broker.list" when using a producer (that > > > would > > > > seem restrictive)? What I've noticed is that the instant the zk > server > > > is > > > > taken down, my producer immediately starts logging connection errors > to > > > zk, > > > > every second, and never stops this logging until zk comes back. So > it > > > > certainly feels like the producer is attempting to maintain a direct > > > > connection to zk. I suppose I expected it to try for the connection > > > > timeout period (e.g. 6000ms), and then give up, until the next send > > > > request, etc. > > > > > > > > Perhaps what it should do is make that initial zk connection to find > > the > > > > kafka broker list, then shutdown the zk connection if it really > doesn't > > > > need it after that, until possibly recreating it if needed if it can > no > > > > longer make contact with the kafka cluster. > > > > > > > > For the async queuing behavior, I agree, it's difficult to respond > to a > > > > send request with an exception, when the sending is done > > asynchronously, > > > in > > > > a different thread. However, this is the behavior when the producer > is > > > > started initially, with no zk available (e.g. producer.send() gets an > > > > exception). So, the api is inconsistent, in that it treats the > > > > unavailability of zk differently, depending on whether it was > > unavailable > > > > at the initial startup, vs. a subsequent zk outage after previously > > > having > > > > been available. > > > > > > > > I am not too concerned about not having 100% guarantee that if I > > > > successfully call producer.send(), that I know it was actually > > delivered. > > > > But it would be nice to have some way to know the current health of > > the > > > > producer, perhaps some sort of "producerStatus()" method. If the > async > > > > sending thread is having issues sending, it might be nice to expose > > that > > > to > > > > the client. Also, if the current producerStatus() is not healthy, > > then I > > > > think it might be ok to not accept new messages to be sent (e.g. > > > > producer.send() could throw an Exception in that case). > > > > > > > > Returning a Future for each message sent seems a bit > unscalable.....I'm > > > not > > > > sure clients want to be tying up resources waiting for Futures all > the > > > time > > > > either. > > > > > > > > I'm also seeing that if kafka goes down, while zk stays up, > subsequent > > > > calls to producer.send() fail immediately with an exception > ("partition > > > is > > > > null"). I think this makes sense, although, in that case, what is > the > > > fate > > > > of previously buffered but unsent messages? Are they all lost? > > > > > > > > But I'd like it if zk goes down, and then kafka goes down, it would > > > behave > > > > the same way as if only kafka went down. Instead, it continues > happily > > > > buffering messages, with lots of zk connection errors logged, but no > > way > > > > for the client code to know that things are not hunky dory. > > > > > > > > In summary: > > > > > > > > 1. If zk connection is not important for a producer, why continually > > log > > > zk > > > > connection errors every second, while at the same time have the > client > > > > behave as if nothing is wrong and just keep accepting messages. > > > > 2. if zk connection goes down, followed by kafka going down, it > behaves > > > no > > > > differently than if only zk went down, from the client's perspective > > (it > > > > keeps accepting new messages). > > > > 3. if zk connection stays up, but kafka goes down, it then fails > > > > immediately with an exception in a call to producer.send (I think > this > > > > makes sense). > > > > 4. we have no way of knowing if/when buffered messages are sent, once > > zk > > > > and/or kafka come back online (although it appears all buffered > > messages > > > > are lost in any case where kafka goes offline). > > > > 5. I'm not clear on the difference between "queue.time" and > > "queue.size". > > > > If kafka is not available, but "queue.time" has expired, what > happens, > > > do > > > > messages get dropped, or do they continue to be buffered until > > queue.size > > > > is exhausted? > > > > 6. What happens if I call producer.close(), while there are buffered > > > > messages. Do the messages get sent before the producer shuts down? > > > > > > > > Jason > > > > > > > > > > > > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > > > > > > > Starting in 0.8 there is no direct connection from the producer to > > zk. > > > > The > > > > > goal here is to make it easy to implement clients in non-java > > languages > > > > and > > > > > avoid painful zk upgrades. ZK is replaced by a "get_metadata" api > > > (which > > > > in > > > > > terms of implementation, of course still just reads from zk--but > now > > > the > > > > > read is done by the server). > > > > > > > > I think the intended behavior for the async producer is the > following: > > > > > 1. Messages are immediately added to a queue of messages to be sent > > > > > 2. The number of messages that can be in the queue is bounded > > > > > by queue.size. If the queue is full the parameter > > > > > queue.enqueueTimeout.msdetermines how long the producer will wait > for > > > > > the queue length to decrease > > > > > before dropping the message > > > > > 3. Messages are sent by a background thread. If this thread falls > > > behind > > > > > due to throughput or because the kafka cluster is down messages > will > > > pile > > > > > up in the queue. > > > > > 4. If the send operation fails there are two options (1) retry, (2) > > > give > > > > > up. If you retry you may get a duplicate (this is the semantics of > > any > > > > > mutation RPC in any system--e.g. if you get a network error you do > > not > > > > know > > > > > if the mutation has occurred or not). If you give up you will lose > > > those > > > > > messages. This decision is controlled by producer.num.retries. > > > > > > > > > > It is desirable that the client not die if the zk connection is > lost, > > > if > > > > > possible, since zk sometimes can have gc pauses or disk latency or > > > > whatever > > > > > other transient issue. > > > > > > > > > > Because the send is async it is impossible and does not have a 1-1 > > > > > correspondence to the network communication it is not possible to > > > > > immediately throw an exception in the send() call. > > > > > > > > > > This is not ideal since how do you know if your send succeeded? We > > > think > > > > > the fix is to have the send call return a future representing the > > > result > > > > of > > > > > the request that will eventually be made as well as returning the > > > offset > > > > of > > > > > your message if you care. This is a bit of a large refactoring of > the > > > > > producer code (which could definitely use some refactoring) so our > > > > > tentative plan was to address it in 0.9. > > > > > > > > > > I think what you are saying is slightly different, though, which is > > > that > > > > > the behavior between the various cases should be consistent. What > do > > > you > > > > > think would be the right behavior? > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <j...@squareup.com > > > > > > wrote: > > > > > > > > > > > I forgot to mention, that I'm working with a recent version of > the > > > 0.8 > > > > > code > > > > > > (Last chaned rev: 1396425). > > > > > > > > > > > > Jason > > > > > > > > > > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg < > j...@squareup.com > > > > > > > > wrote: > > > > > > > > > > > > > I've been doing some testing, with an async producer. > > > > > > > > > > > > > > It seems, if I start up the producer, with no zk cluster > present, > > > it > > > > > does > > > > > > > what I expect, that is it waits for a limited time looking for > > the > > > zk > > > > > > > cluster, and then gives up after the > > zk.connectiontimeout.mssetting > > > > > > > (6000ms, by default), and fails to send a message. However, if > > > after > > > > > > > starting up zk and having a good connection to zk and kafka, I > > then > > > > > > > shutdown the zk cluster, the producer never seems to stop > > accepting > > > > > > > messages to send. > > > > > > > > > > > > > > As long as kafka stays up and running, even without zk still > > > > available, > > > > > > my > > > > > > > producer sends messages and my consumer can consume them. > > > > > > > > > > > > > > However, if I then stop kafka also, my producer happily keeps > on > > > > > > accepting > > > > > > > messages without failing in a call to producer.send(). It's > > > clearly > > > > no > > > > > > > longer able to send any messages at this point. So, I assume > it > > > > > > eventually > > > > > > > will just start dropping messages on the floor? > > > > > > > > > > > > > > I would have expected that once both zk/kafka are not > available, > > > > things > > > > > > > should revert to the initial startup case, where it tries for > > > 6000ms > > > > > and > > > > > > > then throws an exception on send. > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > What's the expected behavior for async producers, when the > async > > > > > buffered > > > > > > > messages can't be sent. I think it's fine if they are just > lost, > > > but > > > > > > > should it be possible to block further accepting of messages > once > > > the > > > > > > > system has detected a problem communicating with zk/kafka? > > > > > > > > > > > > > > Also, if I cleanly shutdown an async producer (e.g. call > > > > > > > producer.close()), should it make a best effort to send out any > > > > > buffered > > > > > > > messages before shutting down? Or will it quit immediately > > > dropping > > > > > any > > > > > > > buffered messages on the floor? > > > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > >