>> So the VIP is only for getting meta-data? After that, under the covers, the producers will make direct connections to individual kafka hosts that they learned about from connecting through the VIP
That's right. Thanks for your questions ! On Tue, Nov 20, 2012 at 10:28 AM, Jason Rosenberg <j...@squareup.com> wrote: > Ok, > > I think I understand (so I'll need to change some things in our set up to > work with 0.8). > > So the VIP is only for getting meta-data? After that, under the covers, > the producers will make direct connections to individual kafka hosts that > they learned about from connecting through the VIP? > > Jason > > On Tue, Nov 20, 2012 at 10:20 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> I think the confusion is that we are answering a slightly different >> question then what you are asking. If I understand you are asking, "do I >> need to put ALL the kafka broker urls into the config for the client and >> will this need to be updated if I add machines to the cluster?". >> >> The answer to both these questions is no. The broker list configuration >> will work exactly as your zookeeper configuration worked. Namely you must >> have the URL of at least one operational broker in the cluster, and the >> producer will use this/these urls to fetch a complete topology of the >> cluster (all nodes, and what partitions they have). If you add kafka >> brokers or migrate partitions from one broker to another clients will >> automatically discover this and adjust appropriately with no need for >> config changes. The brokerlist you give is only used when fetching >> metadata, all producer requests go directly to the appropriate broker. As a >> result you can use a VIP for the broker list if you like, without having >> any of the actual data you send go through that VIP. >> >> As Neha and Jun mentioned there were a couple of reasons for this change: >> 1. If you use kafka heavily everything ends up connecting to zk and any >> operational change to zk or upgrade because immensely difficult. >> 2. Zk support outside java is spotty at best. >> 3. In effect we were using zk for what it is good at--discover--because >> discovery is asynchronous. That is if you try to send to the wrong broker >> we need to give you an error right away and have you update your metadata, >> and this will likely happen before the zk watcher fires. Plus once you >> handle this case you don't need the watcher. As a result zk is just being >> used as a key-value store. >> >> -Jay >> >> >> >> On Tue, Nov 20, 2012 at 9:44 AM, Jason Rosenberg <j...@squareup.com> wrote: >> >> > Ok, >> > >> > So, I'm still wrapping my mind around this. I liked being able to use zk >> > for all clients, since it made it very easy to think about how to update >> > the kafka cluster. E.g. how to add new brokers, how to move them all to >> > new hosts entirely, etc., without having to redeploy all the clients. >> The >> > new brokers will simply advertise their new location via zk, and all >> > clients will pick it up. >> > >> > By requiring use of a configured broker.list for each client, means that >> > 1000's of deployed apps need to be updated any time the kafka cluster >> > changes, no? (Or am I not understanding?). >> > >> > You mention that auto-discovery of new brokers will still work, is that >> > dependent on the existing configured broker.list set still being >> available >> > also? >> > >> > I can see though how this will greatly reduce the load on zookeeper. >> > >> > Jason >> > >> > >> > >> > On Tue, Nov 20, 2012 at 9:03 AM, Jun Rao <jun...@gmail.com> wrote: >> > >> > > Jason, >> > > >> > > Auto discovery of new brokers and rolling restart of brokers are still >> > > supported in 0.8. It's just that most of the ZK related logic is moved >> to >> > > the broker. >> > > >> > > There are 2 reasons why we want to remove zkclient from the client. >> > > >> > > 1. If the client goes to GC, it can cause zk session expiration and >> cause >> > > churns in the client and extra load on the zk server. >> > > 2. This simplifies the client code and makes the implementation of >> > non-java >> > > clients easier. >> > > >> > > In 0.8, we removed the zk dependency from the producer. Post 0.8, we >> plan >> > > to see if we can do the same thing for the consumer (though more >> > involved). >> > > This shouldn't reduce any existing functionality of the client though. >> > Feel >> > > free to let us know if you still have concerns. >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > >> > > On Tue, Nov 20, 2012 at 7:57 AM, Jason Rosenberg <j...@squareup.com> >> > wrote: >> > > >> > > > I checked out trunk. I guess I assumed that included the latest 0.8. >> > Is >> > > > that not right? Am I just looking at 0.7.x+? >> > > > >> > > > Honestly, I don't think it would be a positive thing not to be able >> to >> > > rely >> > > > on zookeeper in producer code. How does that affect the discovery >> of a >> > > > kafka cluster under dynamic conditions? We'd expect to have a much >> > > higher >> > > > SLA for the zookeeper cluster than for kafka. We'd like to be able >> to >> > > > freely do rolling restarts of the kafka cluster, etc. >> > > > >> > > > Also, it seems a bit asymetric to use zk for the kafka brokers and >> > > > consumers, but not the producers. >> > > > >> > > > Jason >> > > > >> > > > On Mon, Nov 19, 2012 at 8:50 PM, Jay Kreps <jay.kr...@gmail.com> >> > wrote: >> > > > >> > > > > In 0.8 there is no way to use zookeeper from the producer and no >> > > > connection >> > > > > from the client. There isn't even a way to configure a zk >> connection. >> > > Are >> > > > > you sure you checked out the 0.8 branch? >> > > > > >> > > > > Check the code you've got: >> > > > > *jkreps-mn:kafka-0.8 jkreps$ svn info* >> > > > > *Path: .* >> > > > > *URL: >> https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8* >> > > > > *Repository Root: https://svn.apache.org/repos/asf* >> > > > > >> > > > > The key is that it should have come from the URL >> kafka/branches/0.8. >> > > > > >> > > > > -Jay >> > > > > >> > > > > >> > > > > On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg <j...@squareup.com >> > >> > > > wrote: >> > > > > >> > > > > > Regarding the poducer/zk connection: if I am using zk to >> discover >> > > the >> > > > > > kafka cluster, doesn't the producer get updates if zk's knowledge >> > of >> > > > the >> > > > > > cluster changes? Or does it only reconsult zk if the particular >> > > kafka >> > > > > node >> > > > > > it was "getting metadata" from goes away? Should I not be using >> a >> > > > > > "zk.connect" but instead a "broker.list" when using a producer >> > (that >> > > > > would >> > > > > > seem restrictive)? What I've noticed is that the instant the zk >> > > server >> > > > > is >> > > > > > taken down, my producer immediately starts logging connection >> > errors >> > > to >> > > > > zk, >> > > > > > every second, and never stops this logging until zk comes back. >> So >> > > it >> > > > > > certainly feels like the producer is attempting to maintain a >> > direct >> > > > > > connection to zk. I suppose I expected it to try for the >> > connection >> > > > > > timeout period (e.g. 6000ms), and then give up, until the next >> send >> > > > > > request, etc. >> > > > > > >> > > > > > Perhaps what it should do is make that initial zk connection to >> > find >> > > > the >> > > > > > kafka broker list, then shutdown the zk connection if it really >> > > doesn't >> > > > > > need it after that, until possibly recreating it if needed if it >> > can >> > > no >> > > > > > longer make contact with the kafka cluster. >> > > > > > >> > > > > > For the async queuing behavior, I agree, it's difficult to >> respond >> > > to a >> > > > > > send request with an exception, when the sending is done >> > > > asynchronously, >> > > > > in >> > > > > > a different thread. However, this is the behavior when the >> > producer >> > > is >> > > > > > started initially, with no zk available (e.g. producer.send() >> gets >> > an >> > > > > > exception). So, the api is inconsistent, in that it treats the >> > > > > > unavailability of zk differently, depending on whether it was >> > > > unavailable >> > > > > > at the initial startup, vs. a subsequent zk outage after >> previously >> > > > > having >> > > > > > been available. >> > > > > > >> > > > > > I am not too concerned about not having 100% guarantee that if I >> > > > > > successfully call producer.send(), that I know it was actually >> > > > delivered. >> > > > > > But it would be nice to have some way to know the current health >> > of >> > > > the >> > > > > > producer, perhaps some sort of "producerStatus()" method. If the >> > > async >> > > > > > sending thread is having issues sending, it might be nice to >> expose >> > > > that >> > > > > to >> > > > > > the client. Also, if the current producerStatus() is not >> healthy, >> > > > then I >> > > > > > think it might be ok to not accept new messages to be sent (e.g. >> > > > > > producer.send() could throw an Exception in that case). >> > > > > > >> > > > > > Returning a Future for each message sent seems a bit >> > > unscalable.....I'm >> > > > > not >> > > > > > sure clients want to be tying up resources waiting for Futures >> all >> > > the >> > > > > time >> > > > > > either. >> > > > > > >> > > > > > I'm also seeing that if kafka goes down, while zk stays up, >> > > subsequent >> > > > > > calls to producer.send() fail immediately with an exception >> > > ("partition >> > > > > is >> > > > > > null"). I think this makes sense, although, in that case, what >> is >> > > the >> > > > > fate >> > > > > > of previously buffered but unsent messages? Are they all lost? >> > > > > > >> > > > > > But I'd like it if zk goes down, and then kafka goes down, it >> would >> > > > > behave >> > > > > > the same way as if only kafka went down. Instead, it continues >> > > happily >> > > > > > buffering messages, with lots of zk connection errors logged, but >> > no >> > > > way >> > > > > > for the client code to know that things are not hunky dory. >> > > > > > >> > > > > > In summary: >> > > > > > >> > > > > > 1. If zk connection is not important for a producer, why >> > continually >> > > > log >> > > > > zk >> > > > > > connection errors every second, while at the same time have the >> > > client >> > > > > > behave as if nothing is wrong and just keep accepting messages. >> > > > > > 2. if zk connection goes down, followed by kafka going down, it >> > > behaves >> > > > > no >> > > > > > differently than if only zk went down, from the client's >> > perspective >> > > > (it >> > > > > > keeps accepting new messages). >> > > > > > 3. if zk connection stays up, but kafka goes down, it then fails >> > > > > > immediately with an exception in a call to producer.send (I think >> > > this >> > > > > > makes sense). >> > > > > > 4. we have no way of knowing if/when buffered messages are sent, >> > once >> > > > zk >> > > > > > and/or kafka come back online (although it appears all buffered >> > > > messages >> > > > > > are lost in any case where kafka goes offline). >> > > > > > 5. I'm not clear on the difference between "queue.time" and >> > > > "queue.size". >> > > > > > If kafka is not available, but "queue.time" has expired, what >> > > happens, >> > > > > do >> > > > > > messages get dropped, or do they continue to be buffered until >> > > > queue.size >> > > > > > is exhausted? >> > > > > > 6. What happens if I call producer.close(), while there are >> > buffered >> > > > > > messages. Do the messages get sent before the producer shuts >> down? >> > > > > > >> > > > > > Jason >> > > > > > >> > > > > > >> > > > > > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <jay.kr...@gmail.com> >> > > > wrote: >> > > > > > >> > > > > > > Starting in 0.8 there is no direct connection from the producer >> > to >> > > > zk. >> > > > > > The >> > > > > > > goal here is to make it easy to implement clients in non-java >> > > > languages >> > > > > > and >> > > > > > > avoid painful zk upgrades. ZK is replaced by a "get_metadata" >> api >> > > > > (which >> > > > > > in >> > > > > > > terms of implementation, of course still just reads from >> zk--but >> > > now >> > > > > the >> > > > > > > read is done by the server). >> > > > > > >> > > > > > I think the intended behavior for the async producer is the >> > > following: >> > > > > > > 1. Messages are immediately added to a queue of messages to be >> > sent >> > > > > > > 2. The number of messages that can be in the queue is bounded >> > > > > > > by queue.size. If the queue is full the parameter >> > > > > > > queue.enqueueTimeout.msdetermines how long the producer will >> wait >> > > for >> > > > > > > the queue length to decrease >> > > > > > > before dropping the message >> > > > > > > 3. Messages are sent by a background thread. If this thread >> falls >> > > > > behind >> > > > > > > due to throughput or because the kafka cluster is down messages >> > > will >> > > > > pile >> > > > > > > up in the queue. >> > > > > > > 4. If the send operation fails there are two options (1) retry, >> > (2) >> > > > > give >> > > > > > > up. If you retry you may get a duplicate (this is the semantics >> > of >> > > > any >> > > > > > > mutation RPC in any system--e.g. if you get a network error you >> > do >> > > > not >> > > > > > know >> > > > > > > if the mutation has occurred or not). If you give up you will >> > lose >> > > > > those >> > > > > > > messages. This decision is controlled by producer.num.retries. >> > > > > > > >> > > > > > > It is desirable that the client not die if the zk connection is >> > > lost, >> > > > > if >> > > > > > > possible, since zk sometimes can have gc pauses or disk latency >> > or >> > > > > > whatever >> > > > > > > other transient issue. >> > > > > > > >> > > > > > > Because the send is async it is impossible and does not have a >> > 1-1 >> > > > > > > correspondence to the network communication it is not possible >> to >> > > > > > > immediately throw an exception in the send() call. >> > > > > > > >> > > > > > > This is not ideal since how do you know if your send succeeded? >> > We >> > > > > think >> > > > > > > the fix is to have the send call return a future representing >> the >> > > > > result >> > > > > > of >> > > > > > > the request that will eventually be made as well as returning >> the >> > > > > offset >> > > > > > of >> > > > > > > your message if you care. This is a bit of a large refactoring >> of >> > > the >> > > > > > > producer code (which could definitely use some refactoring) so >> > our >> > > > > > > tentative plan was to address it in 0.9. >> > > > > > > >> > > > > > > I think what you are saying is slightly different, though, >> which >> > is >> > > > > that >> > > > > > > the behavior between the various cases should be consistent. >> What >> > > do >> > > > > you >> > > > > > > think would be the right behavior? >> > > > > > > >> > > > > > > -Jay >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg < >> > j...@squareup.com >> > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > I forgot to mention, that I'm working with a recent version >> of >> > > the >> > > > > 0.8 >> > > > > > > code >> > > > > > > > (Last chaned rev: 1396425). >> > > > > > > > >> > > > > > > > Jason >> > > > > > > > >> > > > > > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg < >> > > j...@squareup.com >> > > > > >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > > I've been doing some testing, with an async producer. >> > > > > > > > > >> > > > > > > > > It seems, if I start up the producer, with no zk cluster >> > > present, >> > > > > it >> > > > > > > does >> > > > > > > > > what I expect, that is it waits for a limited time looking >> > for >> > > > the >> > > > > zk >> > > > > > > > > cluster, and then gives up after the >> > > > zk.connectiontimeout.mssetting >> > > > > > > > > (6000ms, by default), and fails to send a message. >> However, >> > if >> > > > > after >> > > > > > > > > starting up zk and having a good connection to zk and >> kafka, >> > I >> > > > then >> > > > > > > > > shutdown the zk cluster, the producer never seems to stop >> > > > accepting >> > > > > > > > > messages to send. >> > > > > > > > > >> > > > > > > > > As long as kafka stays up and running, even without zk >> still >> > > > > > available, >> > > > > > > > my >> > > > > > > > > producer sends messages and my consumer can consume them. >> > > > > > > > > >> > > > > > > > > However, if I then stop kafka also, my producer happily >> keeps >> > > on >> > > > > > > > accepting >> > > > > > > > > messages without failing in a call to producer.send(). >> It's >> > > > > clearly >> > > > > > no >> > > > > > > > > longer able to send any messages at this point. So, I >> assume >> > > it >> > > > > > > > eventually >> > > > > > > > > will just start dropping messages on the floor? >> > > > > > > > > >> > > > > > > > > I would have expected that once both zk/kafka are not >> > > available, >> > > > > > things >> > > > > > > > > should revert to the initial startup case, where it tries >> for >> > > > > 6000ms >> > > > > > > and >> > > > > > > > > then throws an exception on send. >> > > > > > > > > >> > > > > > > > > Thoughts? >> > > > > > > > > >> > > > > > > > > What's the expected behavior for async producers, when the >> > > async >> > > > > > > buffered >> > > > > > > > > messages can't be sent. I think it's fine if they are just >> > > lost, >> > > > > but >> > > > > > > > > should it be possible to block further accepting of >> messages >> > > once >> > > > > the >> > > > > > > > > system has detected a problem communicating with zk/kafka? >> > > > > > > > > >> > > > > > > > > Also, if I cleanly shutdown an async producer (e.g. call >> > > > > > > > > producer.close()), should it make a best effort to send out >> > any >> > > > > > > buffered >> > > > > > > > > messages before shutting down? Or will it quit immediately >> > > > > dropping >> > > > > > > any >> > > > > > > > > buffered messages on the floor? >> > > > > > > > > >> > > > > > > > > Jason >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>