That's right. VIP is only used for getting metadata. All producer send requests are through direct RPC to each broker.
Thanks, Jun On Tue, Nov 20, 2012 at 10:28 AM, Jason Rosenberg <j...@squareup.com> wrote: > Ok, > > I think I understand (so I'll need to change some things in our set up to > work with 0.8). > > So the VIP is only for getting meta-data? After that, under the covers, > the producers will make direct connections to individual kafka hosts that > they learned about from connecting through the VIP? > > Jason > > On Tue, Nov 20, 2012 at 10:20 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > I think the confusion is that we are answering a slightly different > > question then what you are asking. If I understand you are asking, "do I > > need to put ALL the kafka broker urls into the config for the client and > > will this need to be updated if I add machines to the cluster?". > > > > The answer to both these questions is no. The broker list configuration > > will work exactly as your zookeeper configuration worked. Namely you must > > have the URL of at least one operational broker in the cluster, and the > > producer will use this/these urls to fetch a complete topology of the > > cluster (all nodes, and what partitions they have). If you add kafka > > brokers or migrate partitions from one broker to another clients will > > automatically discover this and adjust appropriately with no need for > > config changes. The brokerlist you give is only used when fetching > > metadata, all producer requests go directly to the appropriate broker. > As a > > result you can use a VIP for the broker list if you like, without having > > any of the actual data you send go through that VIP. > > > > As Neha and Jun mentioned there were a couple of reasons for this change: > > 1. If you use kafka heavily everything ends up connecting to zk and any > > operational change to zk or upgrade because immensely difficult. > > 2. Zk support outside java is spotty at best. > > 3. In effect we were using zk for what it is good at--discover--because > > discovery is asynchronous. That is if you try to send to the wrong broker > > we need to give you an error right away and have you update your > metadata, > > and this will likely happen before the zk watcher fires. Plus once you > > handle this case you don't need the watcher. As a result zk is just being > > used as a key-value store. > > > > -Jay > > > > > > > > On Tue, Nov 20, 2012 at 9:44 AM, Jason Rosenberg <j...@squareup.com> > wrote: > > > > > Ok, > > > > > > So, I'm still wrapping my mind around this. I liked being able to use > zk > > > for all clients, since it made it very easy to think about how to > update > > > the kafka cluster. E.g. how to add new brokers, how to move them all > to > > > new hosts entirely, etc., without having to redeploy all the clients. > > The > > > new brokers will simply advertise their new location via zk, and all > > > clients will pick it up. > > > > > > By requiring use of a configured broker.list for each client, means > that > > > 1000's of deployed apps need to be updated any time the kafka cluster > > > changes, no? (Or am I not understanding?). > > > > > > You mention that auto-discovery of new brokers will still work, is that > > > dependent on the existing configured broker.list set still being > > available > > > also? > > > > > > I can see though how this will greatly reduce the load on zookeeper. > > > > > > Jason > > > > > > > > > > > > On Tue, Nov 20, 2012 at 9:03 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > Jason, > > > > > > > > Auto discovery of new brokers and rolling restart of brokers are > still > > > > supported in 0.8. It's just that most of the ZK related logic is > moved > > to > > > > the broker. > > > > > > > > There are 2 reasons why we want to remove zkclient from the client. > > > > > > > > 1. If the client goes to GC, it can cause zk session expiration and > > cause > > > > churns in the client and extra load on the zk server. > > > > 2. This simplifies the client code and makes the implementation of > > > non-java > > > > clients easier. > > > > > > > > In 0.8, we removed the zk dependency from the producer. Post 0.8, we > > plan > > > > to see if we can do the same thing for the consumer (though more > > > involved). > > > > This shouldn't reduce any existing functionality of the client > though. > > > Feel > > > > free to let us know if you still have concerns. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > > > > > On Tue, Nov 20, 2012 at 7:57 AM, Jason Rosenberg <j...@squareup.com> > > > wrote: > > > > > > > > > I checked out trunk. I guess I assumed that included the latest > 0.8. > > > Is > > > > > that not right? Am I just looking at 0.7.x+? > > > > > > > > > > Honestly, I don't think it would be a positive thing not to be able > > to > > > > rely > > > > > on zookeeper in producer code. How does that affect the discovery > > of a > > > > > kafka cluster under dynamic conditions? We'd expect to have a much > > > > higher > > > > > SLA for the zookeeper cluster than for kafka. We'd like to be able > > to > > > > > freely do rolling restarts of the kafka cluster, etc. > > > > > > > > > > Also, it seems a bit asymetric to use zk for the kafka brokers and > > > > > consumers, but not the producers. > > > > > > > > > > Jason > > > > > > > > > > On Mon, Nov 19, 2012 at 8:50 PM, Jay Kreps <jay.kr...@gmail.com> > > > wrote: > > > > > > > > > > > In 0.8 there is no way to use zookeeper from the producer and no > > > > > connection > > > > > > from the client. There isn't even a way to configure a zk > > connection. > > > > Are > > > > > > you sure you checked out the 0.8 branch? > > > > > > > > > > > > Check the code you've got: > > > > > > *jkreps-mn:kafka-0.8 jkreps$ svn info* > > > > > > *Path: .* > > > > > > *URL: > > https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8* > > > > > > *Repository Root: https://svn.apache.org/repos/asf* > > > > > > > > > > > > The key is that it should have come from the URL > > kafka/branches/0.8. > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg < > j...@squareup.com > > > > > > > > wrote: > > > > > > > > > > > > > Regarding the poducer/zk connection: if I am using zk to > > discover > > > > the > > > > > > > kafka cluster, doesn't the producer get updates if zk's > knowledge > > > of > > > > > the > > > > > > > cluster changes? Or does it only reconsult zk if the > particular > > > > kafka > > > > > > node > > > > > > > it was "getting metadata" from goes away? Should I not be > using > > a > > > > > > > "zk.connect" but instead a "broker.list" when using a producer > > > (that > > > > > > would > > > > > > > seem restrictive)? What I've noticed is that the instant the > zk > > > > server > > > > > > is > > > > > > > taken down, my producer immediately starts logging connection > > > errors > > > > to > > > > > > zk, > > > > > > > every second, and never stops this logging until zk comes back. > > So > > > > it > > > > > > > certainly feels like the producer is attempting to maintain a > > > direct > > > > > > > connection to zk. I suppose I expected it to try for the > > > connection > > > > > > > timeout period (e.g. 6000ms), and then give up, until the next > > send > > > > > > > request, etc. > > > > > > > > > > > > > > Perhaps what it should do is make that initial zk connection to > > > find > > > > > the > > > > > > > kafka broker list, then shutdown the zk connection if it really > > > > doesn't > > > > > > > need it after that, until possibly recreating it if needed if > it > > > can > > > > no > > > > > > > longer make contact with the kafka cluster. > > > > > > > > > > > > > > For the async queuing behavior, I agree, it's difficult to > > respond > > > > to a > > > > > > > send request with an exception, when the sending is done > > > > > asynchronously, > > > > > > in > > > > > > > a different thread. However, this is the behavior when the > > > producer > > > > is > > > > > > > started initially, with no zk available (e.g. producer.send() > > gets > > > an > > > > > > > exception). So, the api is inconsistent, in that it treats the > > > > > > > unavailability of zk differently, depending on whether it was > > > > > unavailable > > > > > > > at the initial startup, vs. a subsequent zk outage after > > previously > > > > > > having > > > > > > > been available. > > > > > > > > > > > > > > I am not too concerned about not having 100% guarantee that if > I > > > > > > > successfully call producer.send(), that I know it was actually > > > > > delivered. > > > > > > > But it would be nice to have some way to know the current > health > > > of > > > > > the > > > > > > > producer, perhaps some sort of "producerStatus()" method. If > the > > > > async > > > > > > > sending thread is having issues sending, it might be nice to > > expose > > > > > that > > > > > > to > > > > > > > the client. Also, if the current producerStatus() is not > > healthy, > > > > > then I > > > > > > > think it might be ok to not accept new messages to be sent > (e.g. > > > > > > > producer.send() could throw an Exception in that case). > > > > > > > > > > > > > > Returning a Future for each message sent seems a bit > > > > unscalable.....I'm > > > > > > not > > > > > > > sure clients want to be tying up resources waiting for Futures > > all > > > > the > > > > > > time > > > > > > > either. > > > > > > > > > > > > > > I'm also seeing that if kafka goes down, while zk stays up, > > > > subsequent > > > > > > > calls to producer.send() fail immediately with an exception > > > > ("partition > > > > > > is > > > > > > > null"). I think this makes sense, although, in that case, what > > is > > > > the > > > > > > fate > > > > > > > of previously buffered but unsent messages? Are they all lost? > > > > > > > > > > > > > > But I'd like it if zk goes down, and then kafka goes down, it > > would > > > > > > behave > > > > > > > the same way as if only kafka went down. Instead, it continues > > > > happily > > > > > > > buffering messages, with lots of zk connection errors logged, > but > > > no > > > > > way > > > > > > > for the client code to know that things are not hunky dory. > > > > > > > > > > > > > > In summary: > > > > > > > > > > > > > > 1. If zk connection is not important for a producer, why > > > continually > > > > > log > > > > > > zk > > > > > > > connection errors every second, while at the same time have the > > > > client > > > > > > > behave as if nothing is wrong and just keep accepting messages. > > > > > > > 2. if zk connection goes down, followed by kafka going down, it > > > > behaves > > > > > > no > > > > > > > differently than if only zk went down, from the client's > > > perspective > > > > > (it > > > > > > > keeps accepting new messages). > > > > > > > 3. if zk connection stays up, but kafka goes down, it then > fails > > > > > > > immediately with an exception in a call to producer.send (I > think > > > > this > > > > > > > makes sense). > > > > > > > 4. we have no way of knowing if/when buffered messages are > sent, > > > once > > > > > zk > > > > > > > and/or kafka come back online (although it appears all buffered > > > > > messages > > > > > > > are lost in any case where kafka goes offline). > > > > > > > 5. I'm not clear on the difference between "queue.time" and > > > > > "queue.size". > > > > > > > If kafka is not available, but "queue.time" has expired, what > > > > happens, > > > > > > do > > > > > > > messages get dropped, or do they continue to be buffered until > > > > > queue.size > > > > > > > is exhausted? > > > > > > > 6. What happens if I call producer.close(), while there are > > > buffered > > > > > > > messages. Do the messages get sent before the producer shuts > > down? > > > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps < > jay.kr...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > Starting in 0.8 there is no direct connection from the > producer > > > to > > > > > zk. > > > > > > > The > > > > > > > > goal here is to make it easy to implement clients in non-java > > > > > languages > > > > > > > and > > > > > > > > avoid painful zk upgrades. ZK is replaced by a "get_metadata" > > api > > > > > > (which > > > > > > > in > > > > > > > > terms of implementation, of course still just reads from > > zk--but > > > > now > > > > > > the > > > > > > > > read is done by the server). > > > > > > > > > > > > > > I think the intended behavior for the async producer is the > > > > following: > > > > > > > > 1. Messages are immediately added to a queue of messages to > be > > > sent > > > > > > > > 2. The number of messages that can be in the queue is > bounded > > > > > > > > by queue.size. If the queue is full the parameter > > > > > > > > queue.enqueueTimeout.msdetermines how long the producer will > > wait > > > > for > > > > > > > > the queue length to decrease > > > > > > > > before dropping the message > > > > > > > > 3. Messages are sent by a background thread. If this thread > > falls > > > > > > behind > > > > > > > > due to throughput or because the kafka cluster is down > messages > > > > will > > > > > > pile > > > > > > > > up in the queue. > > > > > > > > 4. If the send operation fails there are two options (1) > retry, > > > (2) > > > > > > give > > > > > > > > up. If you retry you may get a duplicate (this is the > semantics > > > of > > > > > any > > > > > > > > mutation RPC in any system--e.g. if you get a network error > you > > > do > > > > > not > > > > > > > know > > > > > > > > if the mutation has occurred or not). If you give up you will > > > lose > > > > > > those > > > > > > > > messages. This decision is controlled by > producer.num.retries. > > > > > > > > > > > > > > > > It is desirable that the client not die if the zk connection > is > > > > lost, > > > > > > if > > > > > > > > possible, since zk sometimes can have gc pauses or disk > latency > > > or > > > > > > > whatever > > > > > > > > other transient issue. > > > > > > > > > > > > > > > > Because the send is async it is impossible and does not have > a > > > 1-1 > > > > > > > > correspondence to the network communication it is not > possible > > to > > > > > > > > immediately throw an exception in the send() call. > > > > > > > > > > > > > > > > This is not ideal since how do you know if your send > succeeded? > > > We > > > > > > think > > > > > > > > the fix is to have the send call return a future representing > > the > > > > > > result > > > > > > > of > > > > > > > > the request that will eventually be made as well as returning > > the > > > > > > offset > > > > > > > of > > > > > > > > your message if you care. This is a bit of a large > refactoring > > of > > > > the > > > > > > > > producer code (which could definitely use some refactoring) > so > > > our > > > > > > > > tentative plan was to address it in 0.9. > > > > > > > > > > > > > > > > I think what you are saying is slightly different, though, > > which > > > is > > > > > > that > > > > > > > > the behavior between the various cases should be consistent. > > What > > > > do > > > > > > you > > > > > > > > think would be the right behavior? > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg < > > > j...@squareup.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > I forgot to mention, that I'm working with a recent version > > of > > > > the > > > > > > 0.8 > > > > > > > > code > > > > > > > > > (Last chaned rev: 1396425). > > > > > > > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg < > > > > j...@squareup.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I've been doing some testing, with an async producer. > > > > > > > > > > > > > > > > > > > > It seems, if I start up the producer, with no zk cluster > > > > present, > > > > > > it > > > > > > > > does > > > > > > > > > > what I expect, that is it waits for a limited time > looking > > > for > > > > > the > > > > > > zk > > > > > > > > > > cluster, and then gives up after the > > > > > zk.connectiontimeout.mssetting > > > > > > > > > > (6000ms, by default), and fails to send a message. > > However, > > > if > > > > > > after > > > > > > > > > > starting up zk and having a good connection to zk and > > kafka, > > > I > > > > > then > > > > > > > > > > shutdown the zk cluster, the producer never seems to stop > > > > > accepting > > > > > > > > > > messages to send. > > > > > > > > > > > > > > > > > > > > As long as kafka stays up and running, even without zk > > still > > > > > > > available, > > > > > > > > > my > > > > > > > > > > producer sends messages and my consumer can consume them. > > > > > > > > > > > > > > > > > > > > However, if I then stop kafka also, my producer happily > > keeps > > > > on > > > > > > > > > accepting > > > > > > > > > > messages without failing in a call to producer.send(). > > It's > > > > > > clearly > > > > > > > no > > > > > > > > > > longer able to send any messages at this point. So, I > > assume > > > > it > > > > > > > > > eventually > > > > > > > > > > will just start dropping messages on the floor? > > > > > > > > > > > > > > > > > > > > I would have expected that once both zk/kafka are not > > > > available, > > > > > > > things > > > > > > > > > > should revert to the initial startup case, where it tries > > for > > > > > > 6000ms > > > > > > > > and > > > > > > > > > > then throws an exception on send. > > > > > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > > > > > What's the expected behavior for async producers, when > the > > > > async > > > > > > > > buffered > > > > > > > > > > messages can't be sent. I think it's fine if they are > just > > > > lost, > > > > > > but > > > > > > > > > > should it be possible to block further accepting of > > messages > > > > once > > > > > > the > > > > > > > > > > system has detected a problem communicating with > zk/kafka? > > > > > > > > > > > > > > > > > > > > Also, if I cleanly shutdown an async producer (e.g. call > > > > > > > > > > producer.close()), should it make a best effort to send > out > > > any > > > > > > > > buffered > > > > > > > > > > messages before shutting down? Or will it quit > immediately > > > > > > dropping > > > > > > > > any > > > > > > > > > > buffered messages on the floor? > > > > > > > > > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >