I checked out trunk.  I guess I assumed that included the latest 0.8.  Is
that not right?  Am I just looking at 0.7.x+?

Honestly, I don't think it would be a positive thing not to be able to rely
on zookeeper in producer code.  How does that affect the discovery of a
kafka cluster under dynamic conditions?  We'd expect to have a much higher
SLA for the zookeeper cluster than for kafka.  We'd like to be able to
freely do rolling restarts of the kafka cluster, etc.

Also, it seems a bit asymetric to use zk for the kafka brokers and
consumers, but not the producers.

Jason

On Mon, Nov 19, 2012 at 8:50 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> In 0.8 there is no way to use zookeeper from the producer and no connection
> from the client. There isn't even a way to configure a zk connection. Are
> you sure you checked out the 0.8 branch?
>
> Check the code you've got:
> *jkreps-mn:kafka-0.8 jkreps$ svn info*
> *Path: .*
> *URL: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8*
> *Repository Root: https://svn.apache.org/repos/asf*
>
> The key is that it should have come from the URL kafka/branches/0.8.
>
> -Jay
>
>
> On Mon, Nov 19, 2012 at 3:30 PM, Jason Rosenberg <j...@squareup.com> wrote:
>
> > Regarding the poducer/zk connection:  if I am using zk to discover the
> > kafka cluster, doesn't the producer get updates if zk's knowledge of the
> > cluster changes?  Or does it only reconsult zk if the particular kafka
> node
> > it was "getting metadata" from goes away?  Should I not be using a
> > "zk.connect" but instead a "broker.list" when using a producer (that
> would
> > seem restrictive)?  What I've noticed is that the instant the zk server
> is
> > taken down, my producer immediately starts logging connection errors to
> zk,
> > every second, and never stops this logging until zk comes back.  So it
> > certainly feels like the producer is attempting to maintain a direct
> > connection to zk.  I suppose I expected it to try for the connection
> > timeout period (e.g. 6000ms), and then give up, until the next send
> > request, etc.
> >
> > Perhaps what it should do is make that initial zk connection to find the
> > kafka broker list, then shutdown the zk connection if it really doesn't
> > need it after that, until possibly recreating it if needed if it can no
> > longer make contact with the kafka cluster.
> >
> > For the async queuing behavior, I agree, it's difficult to respond to a
> > send request with an exception, when the sending is done asynchronously,
> in
> > a different thread.  However, this is the behavior when the producer is
> > started initially, with no zk available (e.g. producer.send() gets an
> > exception).  So, the api is inconsistent, in that it treats the
> > unavailability of zk differently, depending on whether it was unavailable
> > at the initial startup, vs. a subsequent zk outage after previously
> having
> > been available.
> >
> > I am not too concerned about not having 100% guarantee that if I
> > successfully call producer.send(), that I know it was actually delivered.
> >  But it would be nice to have some way to know the current health of the
> > producer, perhaps some sort of "producerStatus()" method.  If the async
> > sending thread is having issues sending, it might be nice to expose that
> to
> > the client.  Also, if the current producerStatus() is not healthy, then I
> > think it might be ok to not accept new messages to be sent (e.g.
> > producer.send() could throw an Exception in that case).
> >
> > Returning a Future for each message sent seems a bit unscalable.....I'm
> not
> > sure clients want to be tying up resources waiting for Futures all the
> time
> > either.
> >
> > I'm also seeing that if  kafka goes down, while zk stays up, subsequent
> > calls to producer.send() fail immediately with an exception ("partition
> is
> > null").  I think this makes sense, although, in that case, what is the
> fate
> > of previously buffered but unsent messages?  Are they all lost?
> >
> > But I'd like it if zk goes down, and then kafka goes down, it would
> behave
> > the same way as if only kafka went down.  Instead, it continues happily
> > buffering messages, with lots of zk connection errors logged, but no way
> > for the client code to know that things are not hunky dory.
> >
> > In summary:
> >
> > 1. If zk connection is not important for a producer, why continually log
> zk
> > connection errors every second, while at the same time have the client
> > behave as if nothing is wrong and just keep accepting messages.
> > 2. if zk connection goes down, followed by kafka going down, it behaves
> no
> > differently than if only zk went down, from the client's perspective (it
> > keeps accepting new messages).
> > 3. if zk connection stays up, but kafka goes down, it then fails
> > immediately with an exception in a call to producer.send (I think this
> > makes sense).
> > 4. we have no way of knowing if/when buffered messages are sent, once zk
> > and/or kafka come back online (although it appears all buffered messages
> > are lost in any case where kafka goes offline).
> > 5. I'm not clear on the difference between "queue.time" and "queue.size".
> >  If kafka is not available, but "queue.time" has expired, what happens,
> do
> > messages get dropped, or do they continue to be buffered until queue.size
> > is exhausted?
> > 6. What happens if I call producer.close(), while there are buffered
> > messages.  Do the messages get sent before the producer shuts down?
> >
> > Jason
> >
> >
> > On Mon, Nov 19, 2012 at 2:31 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > Starting in 0.8 there is no direct connection from the producer to zk.
> > The
> > > goal here is to make it easy to implement clients in non-java languages
> > and
> > > avoid painful zk upgrades. ZK is replaced by a "get_metadata" api
> (which
> > in
> > > terms of implementation, of course still just reads from zk--but now
> the
> > > read is done by the server).
> >
> > I think the intended behavior for the async producer is the following:
> > > 1. Messages are immediately added to a queue of messages to be sent
> > > 2. The  number of messages that can be in the queue is bounded
> > > by queue.size. If the queue is full the parameter
> > > queue.enqueueTimeout.msdetermines how long the producer will wait for
> > > the queue length to decrease
> > > before dropping the message
> > > 3. Messages are sent by a background thread. If this thread falls
> behind
> > > due to throughput or because the kafka cluster is down messages will
> pile
> > > up in the queue.
> > > 4. If the send operation fails there are two options (1) retry, (2)
> give
> > > up. If you retry you may get a duplicate (this is the semantics of any
> > > mutation RPC in any system--e.g. if you get a network error you do not
> > know
> > > if the mutation has occurred or not). If you give up you will lose
> those
> > > messages. This decision is controlled by producer.num.retries.
> > >
> > > It is desirable that the client not die if the zk connection is lost,
> if
> > > possible, since zk sometimes can have gc pauses or disk latency or
> > whatever
> > > other transient issue.
> > >
> > > Because the send is async it is impossible and does not have a 1-1
> > > correspondence to the network communication it is not possible to
> > > immediately throw an exception in the send() call.
> > >
> > > This is not ideal since how do you know if your send succeeded? We
> think
> > > the fix is to have the send call return a future representing the
> result
> > of
> > > the request that will eventually be made as well as returning the
> offset
> > of
> > > your message if you care. This is a bit of a large refactoring of the
> > > producer code (which could definitely use some refactoring) so our
> > > tentative plan was to address it in 0.9.
> > >
> > > I think what you are saying is slightly different, though, which is
> that
> > > the behavior between the various cases should be consistent. What do
> you
> > > think would be the right behavior?
> > >
> > > -Jay
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Nov 19, 2012 at 1:27 PM, Jason Rosenberg <j...@squareup.com>
> > wrote:
> > >
> > > > I forgot to mention, that I'm working with a recent version of the
> 0.8
> > > code
> > > > (Last chaned rev: 1396425).
> > > >
> > > > Jason
> > > >
> > > > On Mon, Nov 19, 2012 at 1:23 PM, Jason Rosenberg <j...@squareup.com>
> > > wrote:
> > > >
> > > > > I've been doing some testing, with an async producer.
> > > > >
> > > > > It seems, if I start up the producer, with no zk cluster present,
> it
> > > does
> > > > > what I expect, that is it waits for a limited time looking for the
> zk
> > > > > cluster, and then gives up after the zk.connectiontimeout.mssetting
> > > > > (6000ms, by default), and fails to send a message.  However, if
> after
> > > > > starting up zk and having a good connection to zk and kafka, I then
> > > > > shutdown the zk cluster, the producer never seems to stop accepting
> > > > > messages to send.
> > > > >
> > > > > As long as kafka stays up and running, even without zk still
> > available,
> > > > my
> > > > > producer sends messages and my consumer can consume them.
> > > > >
> > > > > However, if I then stop kafka also, my producer happily keeps on
> > > > accepting
> > > > > messages without failing in a call to producer.send().  It's
> clearly
> > no
> > > > > longer able to send any messages at this point.  So, I assume it
> > > > eventually
> > > > > will just start dropping messages on the floor?
> > > > >
> > > > > I would have expected that once both zk/kafka are not available,
> > things
> > > > > should revert to the initial startup case, where it tries for
> 6000ms
> > > and
> > > > > then throws an exception on send.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > What's the expected behavior for async producers, when the async
> > > buffered
> > > > > messages can't be sent.  I think it's fine if they are just lost,
> but
> > > > > should it be possible to block further accepting of messages once
> the
> > > > > system has detected a problem communicating with zk/kafka?
> > > > >
> > > > > Also, if I cleanly shutdown an async producer (e.g. call
> > > > > producer.close()), should it make a best effort to send out any
> > > buffered
> > > > > messages before shutting down?  Or will it quit immediately
> dropping
> > > any
> > > > > buffered messages on the floor?
> > > > >
> > > > > Jason
> > > > >
> > > >
> > >
> >
>

Reply via email to