Is there a configuration doc page for 0.8 (since apparently there are some
new settings)?

Jason

On Tue, Nov 20, 2012 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote:

> That's right. VIP is only used for getting metadata. All producer send
> requests are through direct RPC to each broker.
>
> Thanks,
>
> Jun
>
> On Tue, Nov 20, 2012 at 10:28 AM, Jason Rosenberg <j...@squareup.com>
> wrote:
>
> > Ok,
> >
> > I think I understand (so I'll need to change some things in our set up to
> > work with 0.8).
> >
> > So the VIP is only for getting meta-data?  After that, under the covers,
> > the producers will make direct connections to individual kafka hosts that
> > they learned about from connecting through the VIP?
> >
> > Jason
> >
> > On Tue, Nov 20, 2012 at 10:20 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > I think the confusion is that we are answering a slightly different
> > > question then what you are asking. If I understand you are asking, "do
> I
> > > need to put ALL the kafka broker urls into the config for the client
> and
> > > will this need to be updated if I add machines to the cluster?".
> > >
> > > The answer to both these questions is no. The broker list configuration
> > > will work exactly as your zookeeper configuration worked. Namely you
> must
> > > have the URL of at least one operational broker in the cluster, and the
> > > producer will use this/these urls to fetch a complete topology of the
> > > cluster (all nodes, and what partitions they have). If you add kafka
> > > brokers or migrate partitions from one broker to another clients will
> > > automatically discover this and adjust appropriately with no need for
> > > config changes. The brokerlist you give is only used when fetching
> > > metadata, all producer requests go directly to the appropriate broker.
> > As a
> > > result you can use a VIP for the broker list if you like, without
> having
> > > any of the actual data you send go through that VIP.
> > >
> > > As Neha and Jun mentioned there were a couple of reasons for this
> change:
> > > 1. If you use kafka heavily everything ends up connecting to zk and any
> > > operational change to zk or upgrade because immensely difficult.
> > > 2. Zk support outside java is spotty at best.
> > > 3. In effect we were using zk for what it is good at--discover--because
> > > discovery is asynchronous. That is if you try to send to the wrong
> broker
> > > we need to give you an error right away and have you update your
> > metadata,
> > > and this will likely happen before the zk watcher fires. Plus once you
> > > handle this case you don't need the watcher. As a result zk is just
> being
> > > used as a key-value store.
> > >
> > > -Jay
> > >
> > >
> > >
> > > On Tue, Nov 20, 2012 at 9:44 AM, Jason Rosenberg <j...@squareup.com>
> > wrote:
> > >
> > > > Ok,
> > > >
> > > > So, I'm still wrapping my mind around this.  I liked being able to
> use
> > zk
> > > > for all clients, since it made it very easy to think about how to
> > update
> > > > the kafka cluster.  E.g. how to add new brokers, how to move them all
> > to
> > > > new hosts entirely, etc., without having to redeploy all the clients.
> > >  The
> > > > new brokers will simply advertise their new location via zk, and all
> > > > clients will pick it up.
> > > >
> > > > By requiring use of a configured broker.list for each client, means
> > that
> > > > 1000's of deployed apps need to be updated any time the kafka cluster
> > > > changes, no?  (Or am I not understanding?).
> > > >
> > > > You mention that auto-discovery of new brokers will still work, is
> that
> > > > dependent on the existing configured broker.list set still being
> > > available
> > > > also?
> > > >
> > > > I can see though how this will greatly reduce the load on zookeeper.
> > > >
> > > > Jason
> > > >
> > > >
> > > >
> > > > On Tue, Nov 20, 2012 at 9:03 AM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > > > Jason,
> > > > >
> > > > > Auto discovery of new brokers and rolling restart of brokers are
> > still
> > > > > supported in 0.8. It's just that most of the ZK related logic is
> > moved
> > > to
> > > > > the broker.
> > > > >
> > > > > There are 2 reasons why we want to remove zkclient from the client.
> > > > >
> > > > > 1. If the client goes to GC, it can cause zk session expiration and
> > > cause
> > > > > churns in the client and extra load on the zk server.
> > > > > 2. This simplifies the client code and makes the implementation of
> > > > non-java
> > > > > clients easier.
> > > > >
> > > > > In 0.8, we removed the zk dependency from the producer. Post 0.8,
> we
> > > plan
> > > > > to see if we can do the same thing for the consumer (though more
> > > > involved).
> > > > > This shouldn't reduce any existing functionality of the client
> > though.
> > > > Feel
> > > > > free to let us know if you still have concerns.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Nov 20, 2012 at 7:57 AM, Jason Rosenberg <j...@squareup.com
> >
> > > > wrote:
> > > > >
> > > > > > I checked out trunk.  I guess I assumed that included the latest
> > 0.8.
> > > >  Is
> > > > > > that not right?  Am I just looking at 0.7.x+?
> > > > > >
> > > > > > Honestly, I don't think it would be a positive thing not to be
> able
> > > to
> > > > > rely
> > > > > > on zookeeper in producer code.  How does that affect the
> discovery
> > > of a
> > > > > > kafka cluster under dynamic conditions?  We'd expect to have a
> much
> > > > > higher
> > > > > > SLA for the zookeeper cluster than for kafka.  We'd like to be
> able
> > > to
> > > > > > freely do rolling restarts of the kafka cluster, etc.
> > > > > >
> > > > > > Also, it seems a bit asymetric to use zk for the kafka brokers
> and
> > > > > > consumers, but not the producers.
> > > > > >
> > > > > > Jason
> > > > > >
> > > > > > On Mon, Nov 19, 2012 at 8:50 PM, Jay Kreps <jay.kr...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > In 0.8 there is no way to use zookeeper from the producer and
> no
> > > > > > connection
> > > > > > > from the client. There isn't even a way to configure a zk
> > > connection.
> > > > > Are
> > > > > > > you sure you checked out the 0.8 branch?
> > > > > > >
> > > > > > > Check the code you've got:
> > > > > > > *jkreps-mn:kafka-0.8 jkreps$ svn info*
> > > > > > > *Path: .*
> > > > > > > *URL:
> > > https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8*
> > > > > > > *Repository Root: https://svn.apache.org/repos/asf*
> > > > > > >
> > > > > > > The key is that it should have come from the URL
> > > kafka/branches/0.8.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg <
> > j...@squareup.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Regarding the poducer/zk connection:  if I am using zk to
> > > discover
> > > > > the
> > > > > > > > kafka cluster, doesn't the producer get updates if zk's
> > knowledge
> > > > of
> > > > > > the
> > > > > > > > cluster changes?  Or does it only reconsult zk if the
> > particular
> > > > > kafka
> > > > > > > node
> > > > > > > > it was "getting metadata" from goes away?  Should I not be
> > using
> > > a
> > > > > > > > "zk.connect" but instead a "broker.list" when using a
> producer
> > > > (that
> > > > > > > would
> > > > > > > > seem restrictive)?  What I've noticed is that the instant the
> > zk
> > > > > server
> > > > > > > is
> > > > > > > > taken down, my producer immediately starts logging connection
> > > > errors
> > > > > to
> > > > > > > zk,
> > > > > > > > every second, and never stops this logging until zk comes
> back.
> > >  So
> > > > > it
> > > > > > > > certainly feels like the producer is attempting to maintain a
> > > > direct
> > > > > > > > connection to zk.  I suppose I expected it to try for the
> > > > connection
> > > > > > > > timeout period (e.g. 6000ms), and then give up, until the
> next
> > > send
> > > > > > > > request, etc.
> > > > > > > >
> > > > > > > > Perhaps what it should do is make that initial zk connection
> to
> > > > find
> > > > > > the
> > > > > > > > kafka broker list, then shutdown the zk connection if it
> really
> > > > > doesn't
> > > > > > > > need it after that, until possibly recreating it if needed if
> > it
> > > > can
> > > > > no
> > > > > > > > longer make contact with the kafka cluster.
> > > > > > > >
> > > > > > > > For the async queuing behavior, I agree, it's difficult to
> > > respond
> > > > > to a
> > > > > > > > send request with an exception, when the sending is done
> > > > > > asynchronously,
> > > > > > > in
> > > > > > > > a different thread.  However, this is the behavior when the
> > > > producer
> > > > > is
> > > > > > > > started initially, with no zk available (e.g. producer.send()
> > > gets
> > > > an
> > > > > > > > exception).  So, the api is inconsistent, in that it treats
> the
> > > > > > > > unavailability of zk differently, depending on whether it was
> > > > > > unavailable
> > > > > > > > at the initial startup, vs. a subsequent zk outage after
> > > previously
> > > > > > > having
> > > > > > > > been available.
> > > > > > > >
> > > > > > > > I am not too concerned about not having 100% guarantee that
> if
> > I
> > > > > > > > successfully call producer.send(), that I know it was
> actually
> > > > > > delivered.
> > > > > > > >  But it would be nice to have some way to know the current
> > health
> > > > of
> > > > > > the
> > > > > > > > producer, perhaps some sort of "producerStatus()" method.  If
> > the
> > > > > async
> > > > > > > > sending thread is having issues sending, it might be nice to
> > > expose
> > > > > > that
> > > > > > > to
> > > > > > > > the client.  Also, if the current producerStatus() is not
> > > healthy,
> > > > > > then I
> > > > > > > > think it might be ok to not accept new messages to be sent
> > (e.g.
> > > > > > > > producer.send() could throw an Exception in that case).
> > > > > > > >
> > > > > > > > Returning a Future for each message sent seems a bit
> > > > > unscalable.....I'm
> > > > > > > not
> > > > > > > > sure clients want to be tying up resources waiting for
> Futures
> > > all
> > > > > the
> > > > > > > time
> > > > > > > > either.
> > > > > > > >
> > > > > > > > I'm also seeing that if  kafka goes down, while zk stays up,
> > > > > subsequent
> > > > > > > > calls to producer.send() fail immediately with an exception
> > > > > ("partition
> > > > > > > is
> > > > > > > > null").  I think this makes sense, although, in that case,
> what
> > > is
> > > > > the
> > > > > > > fate
> > > > > > > > of previously buffered but unsent messages?  Are they all
> lost?
> > > > > > > >
> > > > > > > > But I'd like it if zk goes down, and then kafka goes down, it
> > > would
> > > > > > > behave
> > > > > > > > the same way as if only kafka went down.  Instead, it
> continues
> > > > > happily
> > > > > > > > buffering messages, with lots of zk connection errors logged,
> > but
> > > > no
> > > > > > way
> > > > > > > > for the client code to know that things are not hunky dory.
> > > > > > > >
> > > > > > > > In summary:
> > > > > > > >
> > > > > > > > 1. If zk connection is not important for a producer, why
> > > > continually
> > > > > > log
> > > > > > > zk
> > > > > > > > connection errors every second, while at the same time have
> the
> > > > > client
> > > > > > > > behave as if nothing is wrong and just keep accepting
> messages.
> > > > > > > > 2. if zk connection goes down, followed by kafka going down,
> it
> > > > > behaves
> > > > > > > no
> > > > > > > > differently than if only zk went down, from the client's
> > > > perspective
> > > > > > (it
> > > > > > > > keeps accepting new messages).
> > > > > > > > 3. if zk connection stays up, but kafka goes down, it then
> > fails
> > > > > > > > immediately with an exception in a call to producer.send (I
> > think
> > > > > this
> > > > > > > > makes sense).
> > > > > > > > 4. we have no way of knowing if/when buffered messages are
> > sent,
> > > > once
> > > > > > zk
> > > > > > > > and/or kafka come back online (although it appears all
> buffered
> > > > > > messages
> > > > > > > > are lost in any case where kafka goes offline).
> > > > > > > > 5. I'm not clear on the difference between "queue.time" and
> > > > > > "queue.size".
> > > > > > > >  If kafka is not available, but "queue.time" has expired,
> what
> > > > > happens,
> > > > > > > do
> > > > > > > > messages get dropped, or do they continue to be buffered
> until
> > > > > > queue.size
> > > > > > > > is exhausted?
> > > > > > > > 6. What happens if I call producer.close(), while there are
> > > > buffered
> > > > > > > > messages.  Do the messages get sent before the producer shuts
> > > down?
> > > > > > > >
> > > > > > > > Jason
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <
> > jay.kr...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Starting in 0.8 there is no direct connection from the
> > producer
> > > > to
> > > > > > zk.
> > > > > > > > The
> > > > > > > > > goal here is to make it easy to implement clients in
> non-java
> > > > > > languages
> > > > > > > > and
> > > > > > > > > avoid painful zk upgrades. ZK is replaced by a
> "get_metadata"
> > > api
> > > > > > > (which
> > > > > > > > in
> > > > > > > > > terms of implementation, of course still just reads from
> > > zk--but
> > > > > now
> > > > > > > the
> > > > > > > > > read is done by the server).
> > > > > > > >
> > > > > > > > I think the intended behavior for the async producer is the
> > > > > following:
> > > > > > > > > 1. Messages are immediately added to a queue of messages to
> > be
> > > > sent
> > > > > > > > > 2. The  number of messages that can be in the queue is
> > bounded
> > > > > > > > > by queue.size. If the queue is full the parameter
> > > > > > > > > queue.enqueueTimeout.msdetermines how long the producer
> will
> > > wait
> > > > > for
> > > > > > > > > the queue length to decrease
> > > > > > > > > before dropping the message
> > > > > > > > > 3. Messages are sent by a background thread. If this thread
> > > falls
> > > > > > > behind
> > > > > > > > > due to throughput or because the kafka cluster is down
> > messages
> > > > > will
> > > > > > > pile
> > > > > > > > > up in the queue.
> > > > > > > > > 4. If the send operation fails there are two options (1)
> > retry,
> > > > (2)
> > > > > > > give
> > > > > > > > > up. If you retry you may get a duplicate (this is the
> > semantics
> > > > of
> > > > > > any
> > > > > > > > > mutation RPC in any system--e.g. if you get a network error
> > you
> > > > do
> > > > > > not
> > > > > > > > know
> > > > > > > > > if the mutation has occurred or not). If you give up you
> will
> > > > lose
> > > > > > > those
> > > > > > > > > messages. This decision is controlled by
> > producer.num.retries.
> > > > > > > > >
> > > > > > > > > It is desirable that the client not die if the zk
> connection
> > is
> > > > > lost,
> > > > > > > if
> > > > > > > > > possible, since zk sometimes can have gc pauses or disk
> > latency
> > > > or
> > > > > > > > whatever
> > > > > > > > > other transient issue.
> > > > > > > > >
> > > > > > > > > Because the send is async it is impossible and does not
> have
> > a
> > > > 1-1
> > > > > > > > > correspondence to the network communication it is not
> > possible
> > > to
> > > > > > > > > immediately throw an exception in the send() call.
> > > > > > > > >
> > > > > > > > > This is not ideal since how do you know if your send
> > succeeded?
> > > > We
> > > > > > > think
> > > > > > > > > the fix is to have the send call return a future
> representing
> > > the
> > > > > > > result
> > > > > > > > of
> > > > > > > > > the request that will eventually be made as well as
> returning
> > > the
> > > > > > > offset
> > > > > > > > of
> > > > > > > > > your message if you care. This is a bit of a large
> > refactoring
> > > of
> > > > > the
> > > > > > > > > producer code (which could definitely use some refactoring)
> > so
> > > > our
> > > > > > > > > tentative plan was to address it in 0.9.
> > > > > > > > >
> > > > > > > > > I think what you are saying is slightly different, though,
> > > which
> > > > is
> > > > > > > that
> > > > > > > > > the behavior between the various cases should be
> consistent.
> > > What
> > > > > do
> > > > > > > you
> > > > > > > > > think would be the right behavior?
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <
> > > > j...@squareup.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I forgot to mention, that I'm working with a recent
> version
> > > of
> > > > > the
> > > > > > > 0.8
> > > > > > > > > code
> > > > > > > > > > (Last chaned rev: 1396425).
> > > > > > > > > >
> > > > > > > > > > Jason
> > > > > > > > > >
> > > > > > > > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg <
> > > > > j...@squareup.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I've been doing some testing, with an async producer.
> > > > > > > > > > >
> > > > > > > > > > > It seems, if I start up the producer, with no zk
> cluster
> > > > > present,
> > > > > > > it
> > > > > > > > > does
> > > > > > > > > > > what I expect, that is it waits for a limited time
> > looking
> > > > for
> > > > > > the
> > > > > > > zk
> > > > > > > > > > > cluster, and then gives up after the
> > > > > > zk.connectiontimeout.mssetting
> > > > > > > > > > > (6000ms, by default), and fails to send a message.
> > >  However,
> > > > if
> > > > > > > after
> > > > > > > > > > > starting up zk and having a good connection to zk and
> > > kafka,
> > > > I
> > > > > > then
> > > > > > > > > > > shutdown the zk cluster, the producer never seems to
> stop
> > > > > > accepting
> > > > > > > > > > > messages to send.
> > > > > > > > > > >
> > > > > > > > > > > As long as kafka stays up and running, even without zk
> > > still
> > > > > > > > available,
> > > > > > > > > > my
> > > > > > > > > > > producer sends messages and my consumer can consume
> them.
> > > > > > > > > > >
> > > > > > > > > > > However, if I then stop kafka also, my producer happily
> > > keeps
> > > > > on
> > > > > > > > > > accepting
> > > > > > > > > > > messages without failing in a call to producer.send().
> > >  It's
> > > > > > > clearly
> > > > > > > > no
> > > > > > > > > > > longer able to send any messages at this point.  So, I
> > > assume
> > > > > it
> > > > > > > > > > eventually
> > > > > > > > > > > will just start dropping messages on the floor?
> > > > > > > > > > >
> > > > > > > > > > > I would have expected that once both zk/kafka are not
> > > > > available,
> > > > > > > > things
> > > > > > > > > > > should revert to the initial startup case, where it
> tries
> > > for
> > > > > > > 6000ms
> > > > > > > > > and
> > > > > > > > > > > then throws an exception on send.
> > > > > > > > > > >
> > > > > > > > > > > Thoughts?
> > > > > > > > > > >
> > > > > > > > > > > What's the expected behavior for async producers, when
> > the
> > > > > async
> > > > > > > > > buffered
> > > > > > > > > > > messages can't be sent.  I think it's fine if they are
> > just
> > > > > lost,
> > > > > > > but
> > > > > > > > > > > should it be possible to block further accepting of
> > > messages
> > > > > once
> > > > > > > the
> > > > > > > > > > > system has detected a problem communicating with
> > zk/kafka?
> > > > > > > > > > >
> > > > > > > > > > > Also, if I cleanly shutdown an async producer (e.g.
> call
> > > > > > > > > > > producer.close()), should it make a best effort to send
> > out
> > > > any
> > > > > > > > > buffered
> > > > > > > > > > > messages before shutting down?  Or will it quit
> > immediately
> > > > > > > dropping
> > > > > > > > > any
> > > > > > > > > > > buffered messages on the floor?
> > > > > > > > > > >
> > > > > > > > > > > Jason
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to