> By requiring use of a configured broker.list for each client, means that
> 1000's of deployed apps need to be updated any time the kafka cluster
> changes, no?  (Or am I not understanding?).

The advantage is that you can configure broker.list to point to a VIP, so you
can transparently change the brokers behind the VIP without having to
re-configure
the producers. On the other hand, if you ever had to make a similar
change to your
zookeeper cluster, it will be very operationally heavy since you will
have to make a
config change on each of your producers.

> You mention that auto-discovery of new brokers will still work, is that
> dependent on the existing configured broker.list set still being available
> also?

It depends on at least one broker in the cluster being alive. If none
of them are alive,
you have a much bigger problem to worry about.

Thanks,
Neha

On Tue, Nov 20, 2012 at 9:44 AM, Jason Rosenberg <j...@squareup.com> wrote:
> Ok,
>
> So, I'm still wrapping my mind around this.  I liked being able to use zk
> for all clients, since it made it very easy to think about how to update
> the kafka cluster.  E.g. how to add new brokers, how to move them all to
> new hosts entirely, etc., without having to redeploy all the clients.  The
> new brokers will simply advertise their new location via zk, and all
> clients will pick it up.
>
> By requiring use of a configured broker.list for each client, means that
> 1000's of deployed apps need to be updated any time the kafka cluster
> changes, no?  (Or am I not understanding?).
>
> You mention that auto-discovery of new brokers will still work, is that
> dependent on the existing configured broker.list set still being available
> also?
>
> I can see though how this will greatly reduce the load on zookeeper.
>
> Jason
>
>
>
> On Tue, Nov 20, 2012 at 9:03 AM, Jun Rao <jun...@gmail.com> wrote:
>
>> Jason,
>>
>> Auto discovery of new brokers and rolling restart of brokers are still
>> supported in 0.8. It's just that most of the ZK related logic is moved to
>> the broker.
>>
>> There are 2 reasons why we want to remove zkclient from the client.
>>
>> 1. If the client goes to GC, it can cause zk session expiration and cause
>> churns in the client and extra load on the zk server.
>> 2. This simplifies the client code and makes the implementation of non-java
>> clients easier.
>>
>> In 0.8, we removed the zk dependency from the producer. Post 0.8, we plan
>> to see if we can do the same thing for the consumer (though more involved).
>> This shouldn't reduce any existing functionality of the client though. Feel
>> free to let us know if you still have concerns.
>>
>> Thanks,
>>
>> Jun
>>
>>
>>
>> On Tue, Nov 20, 2012 at 7:57 AM, Jason Rosenberg <j...@squareup.com> wrote:
>>
>> > I checked out trunk.  I guess I assumed that included the latest 0.8.  Is
>> > that not right?  Am I just looking at 0.7.x+?
>> >
>> > Honestly, I don't think it would be a positive thing not to be able to
>> rely
>> > on zookeeper in producer code.  How does that affect the discovery of a
>> > kafka cluster under dynamic conditions?  We'd expect to have a much
>> higher
>> > SLA for the zookeeper cluster than for kafka.  We'd like to be able to
>> > freely do rolling restarts of the kafka cluster, etc.
>> >
>> > Also, it seems a bit asymetric to use zk for the kafka brokers and
>> > consumers, but not the producers.
>> >
>> > Jason
>> >
>> > On Mon, Nov 19, 2012 at 8:50 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>> >
>> > > In 0.8 there is no way to use zookeeper from the producer and no
>> > connection
>> > > from the client. There isn't even a way to configure a zk connection.
>> Are
>> > > you sure you checked out the 0.8 branch?
>> > >
>> > > Check the code you've got:
>> > > *jkreps-mn:kafka-0.8 jkreps$ svn info*
>> > > *Path: .*
>> > > *URL: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8*
>> > > *Repository Root: https://svn.apache.org/repos/asf*
>> > >
>> > > The key is that it should have come from the URL kafka/branches/0.8.
>> > >
>> > > -Jay
>> > >
>> > >
>> > > On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg <j...@squareup.com>
>> > wrote:
>> > >
>> > > > Regarding the poducer/zk connection:  if I am using zk to discover
>> the
>> > > > kafka cluster, doesn't the producer get updates if zk's knowledge of
>> > the
>> > > > cluster changes?  Or does it only reconsult zk if the particular
>> kafka
>> > > node
>> > > > it was "getting metadata" from goes away?  Should I not be using a
>> > > > "zk.connect" but instead a "broker.list" when using a producer (that
>> > > would
>> > > > seem restrictive)?  What I've noticed is that the instant the zk
>> server
>> > > is
>> > > > taken down, my producer immediately starts logging connection errors
>> to
>> > > zk,
>> > > > every second, and never stops this logging until zk comes back.  So
>> it
>> > > > certainly feels like the producer is attempting to maintain a direct
>> > > > connection to zk.  I suppose I expected it to try for the connection
>> > > > timeout period (e.g. 6000ms), and then give up, until the next send
>> > > > request, etc.
>> > > >
>> > > > Perhaps what it should do is make that initial zk connection to find
>> > the
>> > > > kafka broker list, then shutdown the zk connection if it really
>> doesn't
>> > > > need it after that, until possibly recreating it if needed if it can
>> no
>> > > > longer make contact with the kafka cluster.
>> > > >
>> > > > For the async queuing behavior, I agree, it's difficult to respond
>> to a
>> > > > send request with an exception, when the sending is done
>> > asynchronously,
>> > > in
>> > > > a different thread.  However, this is the behavior when the producer
>> is
>> > > > started initially, with no zk available (e.g. producer.send() gets an
>> > > > exception).  So, the api is inconsistent, in that it treats the
>> > > > unavailability of zk differently, depending on whether it was
>> > unavailable
>> > > > at the initial startup, vs. a subsequent zk outage after previously
>> > > having
>> > > > been available.
>> > > >
>> > > > I am not too concerned about not having 100% guarantee that if I
>> > > > successfully call producer.send(), that I know it was actually
>> > delivered.
>> > > >  But it would be nice to have some way to know the current health of
>> > the
>> > > > producer, perhaps some sort of "producerStatus()" method.  If the
>> async
>> > > > sending thread is having issues sending, it might be nice to expose
>> > that
>> > > to
>> > > > the client.  Also, if the current producerStatus() is not healthy,
>> > then I
>> > > > think it might be ok to not accept new messages to be sent (e.g.
>> > > > producer.send() could throw an Exception in that case).
>> > > >
>> > > > Returning a Future for each message sent seems a bit
>> unscalable.....I'm
>> > > not
>> > > > sure clients want to be tying up resources waiting for Futures all
>> the
>> > > time
>> > > > either.
>> > > >
>> > > > I'm also seeing that if  kafka goes down, while zk stays up,
>> subsequent
>> > > > calls to producer.send() fail immediately with an exception
>> ("partition
>> > > is
>> > > > null").  I think this makes sense, although, in that case, what is
>> the
>> > > fate
>> > > > of previously buffered but unsent messages?  Are they all lost?
>> > > >
>> > > > But I'd like it if zk goes down, and then kafka goes down, it would
>> > > behave
>> > > > the same way as if only kafka went down.  Instead, it continues
>> happily
>> > > > buffering messages, with lots of zk connection errors logged, but no
>> > way
>> > > > for the client code to know that things are not hunky dory.
>> > > >
>> > > > In summary:
>> > > >
>> > > > 1. If zk connection is not important for a producer, why continually
>> > log
>> > > zk
>> > > > connection errors every second, while at the same time have the
>> client
>> > > > behave as if nothing is wrong and just keep accepting messages.
>> > > > 2. if zk connection goes down, followed by kafka going down, it
>> behaves
>> > > no
>> > > > differently than if only zk went down, from the client's perspective
>> > (it
>> > > > keeps accepting new messages).
>> > > > 3. if zk connection stays up, but kafka goes down, it then fails
>> > > > immediately with an exception in a call to producer.send (I think
>> this
>> > > > makes sense).
>> > > > 4. we have no way of knowing if/when buffered messages are sent, once
>> > zk
>> > > > and/or kafka come back online (although it appears all buffered
>> > messages
>> > > > are lost in any case where kafka goes offline).
>> > > > 5. I'm not clear on the difference between "queue.time" and
>> > "queue.size".
>> > > >  If kafka is not available, but "queue.time" has expired, what
>> happens,
>> > > do
>> > > > messages get dropped, or do they continue to be buffered until
>> > queue.size
>> > > > is exhausted?
>> > > > 6. What happens if I call producer.close(), while there are buffered
>> > > > messages.  Do the messages get sent before the producer shuts down?
>> > > >
>> > > > Jason
>> > > >
>> > > >
>> > > > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <jay.kr...@gmail.com>
>> > wrote:
>> > > >
>> > > > > Starting in 0.8 there is no direct connection from the producer to
>> > zk.
>> > > > The
>> > > > > goal here is to make it easy to implement clients in non-java
>> > languages
>> > > > and
>> > > > > avoid painful zk upgrades. ZK is replaced by a "get_metadata" api
>> > > (which
>> > > > in
>> > > > > terms of implementation, of course still just reads from zk--but
>> now
>> > > the
>> > > > > read is done by the server).
>> > > >
>> > > > I think the intended behavior for the async producer is the
>> following:
>> > > > > 1. Messages are immediately added to a queue of messages to be sent
>> > > > > 2. The  number of messages that can be in the queue is bounded
>> > > > > by queue.size. If the queue is full the parameter
>> > > > > queue.enqueueTimeout.msdetermines how long the producer will wait
>> for
>> > > > > the queue length to decrease
>> > > > > before dropping the message
>> > > > > 3. Messages are sent by a background thread. If this thread falls
>> > > behind
>> > > > > due to throughput or because the kafka cluster is down messages
>> will
>> > > pile
>> > > > > up in the queue.
>> > > > > 4. If the send operation fails there are two options (1) retry, (2)
>> > > give
>> > > > > up. If you retry you may get a duplicate (this is the semantics of
>> > any
>> > > > > mutation RPC in any system--e.g. if you get a network error you do
>> > not
>> > > > know
>> > > > > if the mutation has occurred or not). If you give up you will lose
>> > > those
>> > > > > messages. This decision is controlled by producer.num.retries.
>> > > > >
>> > > > > It is desirable that the client not die if the zk connection is
>> lost,
>> > > if
>> > > > > possible, since zk sometimes can have gc pauses or disk latency or
>> > > > whatever
>> > > > > other transient issue.
>> > > > >
>> > > > > Because the send is async it is impossible and does not have a 1-1
>> > > > > correspondence to the network communication it is not possible to
>> > > > > immediately throw an exception in the send() call.
>> > > > >
>> > > > > This is not ideal since how do you know if your send succeeded? We
>> > > think
>> > > > > the fix is to have the send call return a future representing the
>> > > result
>> > > > of
>> > > > > the request that will eventually be made as well as returning the
>> > > offset
>> > > > of
>> > > > > your message if you care. This is a bit of a large refactoring of
>> the
>> > > > > producer code (which could definitely use some refactoring) so our
>> > > > > tentative plan was to address it in 0.9.
>> > > > >
>> > > > > I think what you are saying is slightly different, though, which is
>> > > that
>> > > > > the behavior between the various cases should be consistent. What
>> do
>> > > you
>> > > > > think would be the right behavior?
>> > > > >
>> > > > > -Jay
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <j...@squareup.com
>> >
>> > > > wrote:
>> > > > >
>> > > > > > I forgot to mention, that I'm working with a recent version of
>> the
>> > > 0.8
>> > > > > code
>> > > > > > (Last chaned rev: 1396425).
>> > > > > >
>> > > > > > Jason
>> > > > > >
>> > > > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg <
>> j...@squareup.com
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > I've been doing some testing, with an async producer.
>> > > > > > >
>> > > > > > > It seems, if I start up the producer, with no zk cluster
>> present,
>> > > it
>> > > > > does
>> > > > > > > what I expect, that is it waits for a limited time looking for
>> > the
>> > > zk
>> > > > > > > cluster, and then gives up after the
>> > zk.connectiontimeout.mssetting
>> > > > > > > (6000ms, by default), and fails to send a message.  However, if
>> > > after
>> > > > > > > starting up zk and having a good connection to zk and kafka, I
>> > then
>> > > > > > > shutdown the zk cluster, the producer never seems to stop
>> > accepting
>> > > > > > > messages to send.
>> > > > > > >
>> > > > > > > As long as kafka stays up and running, even without zk still
>> > > > available,
>> > > > > > my
>> > > > > > > producer sends messages and my consumer can consume them.
>> > > > > > >
>> > > > > > > However, if I then stop kafka also, my producer happily keeps
>> on
>> > > > > > accepting
>> > > > > > > messages without failing in a call to producer.send().  It's
>> > > clearly
>> > > > no
>> > > > > > > longer able to send any messages at this point.  So, I assume
>> it
>> > > > > > eventually
>> > > > > > > will just start dropping messages on the floor?
>> > > > > > >
>> > > > > > > I would have expected that once both zk/kafka are not
>> available,
>> > > > things
>> > > > > > > should revert to the initial startup case, where it tries for
>> > > 6000ms
>> > > > > and
>> > > > > > > then throws an exception on send.
>> > > > > > >
>> > > > > > > Thoughts?
>> > > > > > >
>> > > > > > > What's the expected behavior for async producers, when the
>> async
>> > > > > buffered
>> > > > > > > messages can't be sent.  I think it's fine if they are just
>> lost,
>> > > but
>> > > > > > > should it be possible to block further accepting of messages
>> once
>> > > the
>> > > > > > > system has detected a problem communicating with zk/kafka?
>> > > > > > >
>> > > > > > > Also, if I cleanly shutdown an async producer (e.g. call
>> > > > > > > producer.close()), should it make a best effort to send out any
>> > > > > buffered
>> > > > > > > messages before shutting down?  Or will it quit immediately
>> > > dropping
>> > > > > any
>> > > > > > > buffered messages on the floor?
>> > > > > > >
>> > > > > > > Jason
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>

Reply via email to