> By requiring use of a configured broker.list for each client, means that > 1000's of deployed apps need to be updated any time the kafka cluster > changes, no? (Or am I not understanding?).
The advantage is that you can configure broker.list to point to a VIP, so you can transparently change the brokers behind the VIP without having to re-configure the producers. On the other hand, if you ever had to make a similar change to your zookeeper cluster, it will be very operationally heavy since you will have to make a config change on each of your producers. > You mention that auto-discovery of new brokers will still work, is that > dependent on the existing configured broker.list set still being available > also? It depends on at least one broker in the cluster being alive. If none of them are alive, you have a much bigger problem to worry about. Thanks, Neha On Tue, Nov 20, 2012 at 9:44 AM, Jason Rosenberg <j...@squareup.com> wrote: > Ok, > > So, I'm still wrapping my mind around this. I liked being able to use zk > for all clients, since it made it very easy to think about how to update > the kafka cluster. E.g. how to add new brokers, how to move them all to > new hosts entirely, etc., without having to redeploy all the clients. The > new brokers will simply advertise their new location via zk, and all > clients will pick it up. > > By requiring use of a configured broker.list for each client, means that > 1000's of deployed apps need to be updated any time the kafka cluster > changes, no? (Or am I not understanding?). > > You mention that auto-discovery of new brokers will still work, is that > dependent on the existing configured broker.list set still being available > also? > > I can see though how this will greatly reduce the load on zookeeper. > > Jason > > > > On Tue, Nov 20, 2012 at 9:03 AM, Jun Rao <jun...@gmail.com> wrote: > >> Jason, >> >> Auto discovery of new brokers and rolling restart of brokers are still >> supported in 0.8. It's just that most of the ZK related logic is moved to >> the broker. >> >> There are 2 reasons why we want to remove zkclient from the client. >> >> 1. If the client goes to GC, it can cause zk session expiration and cause >> churns in the client and extra load on the zk server. >> 2. This simplifies the client code and makes the implementation of non-java >> clients easier. >> >> In 0.8, we removed the zk dependency from the producer. Post 0.8, we plan >> to see if we can do the same thing for the consumer (though more involved). >> This shouldn't reduce any existing functionality of the client though. Feel >> free to let us know if you still have concerns. >> >> Thanks, >> >> Jun >> >> >> >> On Tue, Nov 20, 2012 at 7:57 AM, Jason Rosenberg <j...@squareup.com> wrote: >> >> > I checked out trunk. I guess I assumed that included the latest 0.8. Is >> > that not right? Am I just looking at 0.7.x+? >> > >> > Honestly, I don't think it would be a positive thing not to be able to >> rely >> > on zookeeper in producer code. How does that affect the discovery of a >> > kafka cluster under dynamic conditions? We'd expect to have a much >> higher >> > SLA for the zookeeper cluster than for kafka. We'd like to be able to >> > freely do rolling restarts of the kafka cluster, etc. >> > >> > Also, it seems a bit asymetric to use zk for the kafka brokers and >> > consumers, but not the producers. >> > >> > Jason >> > >> > On Mon, Nov 19, 2012 at 8:50 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> > >> > > In 0.8 there is no way to use zookeeper from the producer and no >> > connection >> > > from the client. There isn't even a way to configure a zk connection. >> Are >> > > you sure you checked out the 0.8 branch? >> > > >> > > Check the code you've got: >> > > *jkreps-mn:kafka-0.8 jkreps$ svn info* >> > > *Path: .* >> > > *URL: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8* >> > > *Repository Root: https://svn.apache.org/repos/asf* >> > > >> > > The key is that it should have come from the URL kafka/branches/0.8. >> > > >> > > -Jay >> > > >> > > >> > > On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg <j...@squareup.com> >> > wrote: >> > > >> > > > Regarding the poducer/zk connection: if I am using zk to discover >> the >> > > > kafka cluster, doesn't the producer get updates if zk's knowledge of >> > the >> > > > cluster changes? Or does it only reconsult zk if the particular >> kafka >> > > node >> > > > it was "getting metadata" from goes away? Should I not be using a >> > > > "zk.connect" but instead a "broker.list" when using a producer (that >> > > would >> > > > seem restrictive)? What I've noticed is that the instant the zk >> server >> > > is >> > > > taken down, my producer immediately starts logging connection errors >> to >> > > zk, >> > > > every second, and never stops this logging until zk comes back. So >> it >> > > > certainly feels like the producer is attempting to maintain a direct >> > > > connection to zk. I suppose I expected it to try for the connection >> > > > timeout period (e.g. 6000ms), and then give up, until the next send >> > > > request, etc. >> > > > >> > > > Perhaps what it should do is make that initial zk connection to find >> > the >> > > > kafka broker list, then shutdown the zk connection if it really >> doesn't >> > > > need it after that, until possibly recreating it if needed if it can >> no >> > > > longer make contact with the kafka cluster. >> > > > >> > > > For the async queuing behavior, I agree, it's difficult to respond >> to a >> > > > send request with an exception, when the sending is done >> > asynchronously, >> > > in >> > > > a different thread. However, this is the behavior when the producer >> is >> > > > started initially, with no zk available (e.g. producer.send() gets an >> > > > exception). So, the api is inconsistent, in that it treats the >> > > > unavailability of zk differently, depending on whether it was >> > unavailable >> > > > at the initial startup, vs. a subsequent zk outage after previously >> > > having >> > > > been available. >> > > > >> > > > I am not too concerned about not having 100% guarantee that if I >> > > > successfully call producer.send(), that I know it was actually >> > delivered. >> > > > But it would be nice to have some way to know the current health of >> > the >> > > > producer, perhaps some sort of "producerStatus()" method. If the >> async >> > > > sending thread is having issues sending, it might be nice to expose >> > that >> > > to >> > > > the client. Also, if the current producerStatus() is not healthy, >> > then I >> > > > think it might be ok to not accept new messages to be sent (e.g. >> > > > producer.send() could throw an Exception in that case). >> > > > >> > > > Returning a Future for each message sent seems a bit >> unscalable.....I'm >> > > not >> > > > sure clients want to be tying up resources waiting for Futures all >> the >> > > time >> > > > either. >> > > > >> > > > I'm also seeing that if kafka goes down, while zk stays up, >> subsequent >> > > > calls to producer.send() fail immediately with an exception >> ("partition >> > > is >> > > > null"). I think this makes sense, although, in that case, what is >> the >> > > fate >> > > > of previously buffered but unsent messages? Are they all lost? >> > > > >> > > > But I'd like it if zk goes down, and then kafka goes down, it would >> > > behave >> > > > the same way as if only kafka went down. Instead, it continues >> happily >> > > > buffering messages, with lots of zk connection errors logged, but no >> > way >> > > > for the client code to know that things are not hunky dory. >> > > > >> > > > In summary: >> > > > >> > > > 1. If zk connection is not important for a producer, why continually >> > log >> > > zk >> > > > connection errors every second, while at the same time have the >> client >> > > > behave as if nothing is wrong and just keep accepting messages. >> > > > 2. if zk connection goes down, followed by kafka going down, it >> behaves >> > > no >> > > > differently than if only zk went down, from the client's perspective >> > (it >> > > > keeps accepting new messages). >> > > > 3. if zk connection stays up, but kafka goes down, it then fails >> > > > immediately with an exception in a call to producer.send (I think >> this >> > > > makes sense). >> > > > 4. we have no way of knowing if/when buffered messages are sent, once >> > zk >> > > > and/or kafka come back online (although it appears all buffered >> > messages >> > > > are lost in any case where kafka goes offline). >> > > > 5. I'm not clear on the difference between "queue.time" and >> > "queue.size". >> > > > If kafka is not available, but "queue.time" has expired, what >> happens, >> > > do >> > > > messages get dropped, or do they continue to be buffered until >> > queue.size >> > > > is exhausted? >> > > > 6. What happens if I call producer.close(), while there are buffered >> > > > messages. Do the messages get sent before the producer shuts down? >> > > > >> > > > Jason >> > > > >> > > > >> > > > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <jay.kr...@gmail.com> >> > wrote: >> > > > >> > > > > Starting in 0.8 there is no direct connection from the producer to >> > zk. >> > > > The >> > > > > goal here is to make it easy to implement clients in non-java >> > languages >> > > > and >> > > > > avoid painful zk upgrades. ZK is replaced by a "get_metadata" api >> > > (which >> > > > in >> > > > > terms of implementation, of course still just reads from zk--but >> now >> > > the >> > > > > read is done by the server). >> > > > >> > > > I think the intended behavior for the async producer is the >> following: >> > > > > 1. Messages are immediately added to a queue of messages to be sent >> > > > > 2. The number of messages that can be in the queue is bounded >> > > > > by queue.size. If the queue is full the parameter >> > > > > queue.enqueueTimeout.msdetermines how long the producer will wait >> for >> > > > > the queue length to decrease >> > > > > before dropping the message >> > > > > 3. Messages are sent by a background thread. If this thread falls >> > > behind >> > > > > due to throughput or because the kafka cluster is down messages >> will >> > > pile >> > > > > up in the queue. >> > > > > 4. If the send operation fails there are two options (1) retry, (2) >> > > give >> > > > > up. If you retry you may get a duplicate (this is the semantics of >> > any >> > > > > mutation RPC in any system--e.g. if you get a network error you do >> > not >> > > > know >> > > > > if the mutation has occurred or not). If you give up you will lose >> > > those >> > > > > messages. This decision is controlled by producer.num.retries. >> > > > > >> > > > > It is desirable that the client not die if the zk connection is >> lost, >> > > if >> > > > > possible, since zk sometimes can have gc pauses or disk latency or >> > > > whatever >> > > > > other transient issue. >> > > > > >> > > > > Because the send is async it is impossible and does not have a 1-1 >> > > > > correspondence to the network communication it is not possible to >> > > > > immediately throw an exception in the send() call. >> > > > > >> > > > > This is not ideal since how do you know if your send succeeded? We >> > > think >> > > > > the fix is to have the send call return a future representing the >> > > result >> > > > of >> > > > > the request that will eventually be made as well as returning the >> > > offset >> > > > of >> > > > > your message if you care. This is a bit of a large refactoring of >> the >> > > > > producer code (which could definitely use some refactoring) so our >> > > > > tentative plan was to address it in 0.9. >> > > > > >> > > > > I think what you are saying is slightly different, though, which is >> > > that >> > > > > the behavior between the various cases should be consistent. What >> do >> > > you >> > > > > think would be the right behavior? >> > > > > >> > > > > -Jay >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <j...@squareup.com >> > >> > > > wrote: >> > > > > >> > > > > > I forgot to mention, that I'm working with a recent version of >> the >> > > 0.8 >> > > > > code >> > > > > > (Last chaned rev: 1396425). >> > > > > > >> > > > > > Jason >> > > > > > >> > > > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg < >> j...@squareup.com >> > > >> > > > > wrote: >> > > > > > >> > > > > > > I've been doing some testing, with an async producer. >> > > > > > > >> > > > > > > It seems, if I start up the producer, with no zk cluster >> present, >> > > it >> > > > > does >> > > > > > > what I expect, that is it waits for a limited time looking for >> > the >> > > zk >> > > > > > > cluster, and then gives up after the >> > zk.connectiontimeout.mssetting >> > > > > > > (6000ms, by default), and fails to send a message. However, if >> > > after >> > > > > > > starting up zk and having a good connection to zk and kafka, I >> > then >> > > > > > > shutdown the zk cluster, the producer never seems to stop >> > accepting >> > > > > > > messages to send. >> > > > > > > >> > > > > > > As long as kafka stays up and running, even without zk still >> > > > available, >> > > > > > my >> > > > > > > producer sends messages and my consumer can consume them. >> > > > > > > >> > > > > > > However, if I then stop kafka also, my producer happily keeps >> on >> > > > > > accepting >> > > > > > > messages without failing in a call to producer.send(). It's >> > > clearly >> > > > no >> > > > > > > longer able to send any messages at this point. So, I assume >> it >> > > > > > eventually >> > > > > > > will just start dropping messages on the floor? >> > > > > > > >> > > > > > > I would have expected that once both zk/kafka are not >> available, >> > > > things >> > > > > > > should revert to the initial startup case, where it tries for >> > > 6000ms >> > > > > and >> > > > > > > then throws an exception on send. >> > > > > > > >> > > > > > > Thoughts? >> > > > > > > >> > > > > > > What's the expected behavior for async producers, when the >> async >> > > > > buffered >> > > > > > > messages can't be sent. I think it's fine if they are just >> lost, >> > > but >> > > > > > > should it be possible to block further accepting of messages >> once >> > > the >> > > > > > > system has detected a problem communicating with zk/kafka? >> > > > > > > >> > > > > > > Also, if I cleanly shutdown an async producer (e.g. call >> > > > > > > producer.close()), should it make a best effort to send out any >> > > > > buffered >> > > > > > > messages before shutting down? Or will it quit immediately >> > > dropping >> > > > > any >> > > > > > > buffered messages on the floor? >> > > > > > > >> > > > > > > Jason >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>