Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-06-02 Thread Nakamura
Hi Colin,

> Sure, we organize buffers by broker currently. However, we could set some
maximum buffer size for records that haven't been assigned to a broker yet.
OK, I think we're probably aligned then.  I think we were using slightly
different terminology (queue vs buffer) but we were actually violently
agreeing.

> In general the Kafka producer is supposed to be used from a client
thread. That thread is responsible for calling poll periodically to get the
results of any send() operations it performed. (It's possible to use the
producer from multiple threads as well.)
>
> The main point I was making is that metadata fetches can and should be
done in the same way as any other network I/O in the producer.
Thanks!  I think I don't quite understand, but I'll do some research myself
and try to understand it better.

Best,
Moses


On Wed, Jun 2, 2021 at 2:21 PM Colin McCabe  wrote:

> On Tue, Jun 1, 2021, at 12:22, Nakamura wrote:
> > I think we're talking past each other a bit.  I know about non-blocking
> > I/O.  The problem I'm facing is how to preserve the existing semantics
> > without blocking.  Right now callers assume their work is enqueued
> in-order
> > after `KafkaProducer#send` returns.  We can't simply return a future that
> > represents the metadata fetch, because of that assumption.  We need to
> > maintain order somehow.  That is what all of the different queues we're
> > proposing are intended to do.
>
> Hi Nakamura,
>
> I guess the point I was making was that there is no connection between
> first-in, first-out semantics and blocking. Nothing about FIFO semantics
> requires blocking.
>
> > > How are the ordering semantics of `KafkaProducer#send` related to the
> > > metadata fetch?
> > KafkaProducer#send currently enqueues after it has the metadata, and it
> > passes the TopicPartition struct as part of the data when enqueueing.  We
> > can either update that data structure to be able to work with partial
> > metadata, or we can add a new queue on top.  I outline both potential
> > approaches in the current KIP.
> >
> > > That is not related to the metadata fetch. Also, I already proposed a
> > > solution (returning an error) if this is a concern.
> > Unfortunately it is, because `KafkaProducer#send` conflates the two of
> > them.  That seems to be the central difficulty of preserving the
> semantics
> > here.
>
> Sure, we organize buffers by broker currently. However, we could set some
> maximum buffer size for records that haven't been assigned to a broker yet.
>
> >
> > > The same client thread that always has been responsible for checking
> poll.
> > Please pretend I've never contributed to Kafka before :). Which thread is
> > that?
>
> In general the Kafka producer is supposed to be used from a client thread.
> That thread is responsible for calling poll periodically to get the results
> of any send() operations it performed. (It's possible to use the producer
> from multiple threads as well.)
>
> The main point I was making is that metadata fetches can and should be
> done in the same way as any other network I/O in the producer.
>
> best,
> Colin
>
> >
> > Best,
> > Moses
> >
> > On Tue, Jun 1, 2021 at 3:12 PM Ryanne Dolan 
> wrote:
> >
> > > Colin, the issue for me isn't so much whether non-blocking I/O is used
> or
> > > not, but the fact that the caller observes a long time between calling
> > > send() and receiving the returned future. This behavior can be
> considered
> > > "blocking" whether or not I/O is involved.
> > >
> > > > How are the ordering semantics of `KafkaProducer#send` related to the
> > > metadata fetch?
> > > > I already proposed a solution (returning an error)
> > >
> > > There is a subtle difference between failing immediately vs blocking
> for
> > > metadata, related to ordering in the face of retries. Say we set the
> send
> > > timeout to max-long (or something high enough that we rarely encounter
> > > timeouts in practice), and set max inflight requests to 1. Today, we
> can
> > > reasonably assume that calling send() in sequence to a specific
> partition
> > > will result in the corresponding sequence landing on that partition,
> > > regardless of how the caller handles retries. The caller might not
> handle
> > > retries at all. But if we can fail immediately (e.g. when the metadata
> > > isn't yet ready), then the caller must handle retries carefully.
> > > Specifically, the caller must retr

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-06-01 Thread Nakamura
Hi Colin,

> KafkaProducer#send is supposed to initiate non-blocking I/O, but not wait
for it to complete.
>
> There's more information about non-blocking I/O in Java here:
> https://en.wikipedia.org/wiki/Non-blocking_I/O_%28Java%29
I think we're talking past each other a bit.  I know about non-blocking
I/O.  The problem I'm facing is how to preserve the existing semantics
without blocking.  Right now callers assume their work is enqueued in-order
after `KafkaProducer#send` returns.  We can't simply return a future that
represents the metadata fetch, because of that assumption.  We need to
maintain order somehow.  That is what all of the different queues we're
proposing are intended to do.

> How are the ordering semantics of `KafkaProducer#send` related to the
metadata fetch?
KafkaProducer#send currently enqueues after it has the metadata, and it
passes the TopicPartition struct as part of the data when enqueueing.  We
can either update that data structure to be able to work with partial
metadata, or we can add a new queue on top.  I outline both potential
approaches in the current KIP.

> That is not related to the metadata fetch. Also, I already proposed a
solution (returning an error) if this is a concern.
Unfortunately it is, because `KafkaProducer#send` conflates the two of
them.  That seems to be the central difficulty of preserving the semantics
here.

> The same client thread that always has been responsible for checking poll.
Please pretend I've never contributed to Kafka before :). Which thread is
that?

Best,
Moses

On Tue, Jun 1, 2021 at 3:12 PM Ryanne Dolan  wrote:

> Colin, the issue for me isn't so much whether non-blocking I/O is used or
> not, but the fact that the caller observes a long time between calling
> send() and receiving the returned future. This behavior can be considered
> "blocking" whether or not I/O is involved.
>
> > How are the ordering semantics of `KafkaProducer#send` related to the
> metadata fetch?
> > I already proposed a solution (returning an error)
>
> There is a subtle difference between failing immediately vs blocking for
> metadata, related to ordering in the face of retries. Say we set the send
> timeout to max-long (or something high enough that we rarely encounter
> timeouts in practice), and set max inflight requests to 1. Today, we can
> reasonably assume that calling send() in sequence to a specific partition
> will result in the corresponding sequence landing on that partition,
> regardless of how the caller handles retries. The caller might not handle
> retries at all. But if we can fail immediately (e.g. when the metadata
> isn't yet ready), then the caller must handle retries carefully.
> Specifically, the caller must retry each send() before proceeding to the
> next. This basically means that the caller must block on each send() in
> order to maintain the proper sequence -- how else would the caller know
> whether it will need to retry or not?
>
> In other words, failing immediately punts the problem to the caller to
> handle, while the caller is less-equipped to deal with it. I don't think we
> should do that, at least not in the default case.
>
> I actually don't have any objections to this approach so long as it's
> opt-in. It sounds like you are suggesting to fix the bug for everyone, but
> I don't think we can do that without subtly breaking things.
>
> Ryanne
>
> On Tue, Jun 1, 2021 at 12:31 PM Colin McCabe  wrote:
>
> > On Tue, Jun 1, 2021, at 07:00, Nakamura wrote:
> > > Hi Colin,
> > >
> > > Sorry, I still don't follow.
> > >
> > > Right now `KafkaProducer#send` seems to trigger a metadata fetch.
> Today,
> > > we block on that before returning.  Is your proposal that we move the
> > > metadata fetch out of `KafkaProducer#send` entirely?
> > >
> >
> > KafkaProducer#send is supposed to initiate non-blocking I/O, but not wait
> > for it to complete.
> >
> > There's more information about non-blocking I/O in Java here:
> > https://en.wikipedia.org/wiki/Non-blocking_I/O_%28Java%29
> >
> > >
> > > Even if the metadata fetch moves to be non-blocking, I think we still
> > need
> > > to deal with the problems we've discussed before if the fetch happens
> in
> > > the `KafkaProducer#send` method.  How do we maintain the ordering
> > semantics
> > > of `KafkaProducer#send`?
> >
> > How are the ordering semantics of `KafkaProducer#send` related to the
> > metadata fetch?
> >
> > >  How do we prevent our buffer from filling up?
> >
> > That is not related to the metadata fetch. Also, I already pro

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-06-01 Thread Nakamura
Hi Colin,

Sorry, I still don't follow.

Right now `KafkaProducer#send` seems to trigger a metadata fetch.  Today,
we block on that before returning.  Is your proposal that we move the
metadata fetch out of `KafkaProducer#send` entirely?

Even if the metadata fetch moves to be non-blocking, I think we still need
to deal with the problems we've discussed before if the fetch happens in
the `KafkaProducer#send` method.  How do we maintain the ordering semantics
of `KafkaProducer#send`?  How do we prevent our buffer from filling up?
Which thread is responsible for checking poll()?

The only approach I can see that would avoid this would be moving the
metadata fetch to happen at a different time.  But it's not clear to me
when would be a more appropriate time to do the metadata fetch than
`KafkaProducer#send`.

I think there's something I'm missing here.  Would you mind helping me
figure out what it is?

Best,
Moses

On Sun, May 30, 2021 at 5:35 PM Colin McCabe  wrote:

> On Tue, May 25, 2021, at 11:26, Nakamura wrote:
> > Hey Colin,
> >
> > For the metadata case, what would fixing the bug look like?  I agree that
> > we should fix it, but I don't have a clear picture in my mind of what
> > fixing it should look like.  Can you elaborate?
> >
>
> If the blocking metadata fetch bug were fixed, neither the producer nor
> the consumer would block while fetching metadata. A poll() call would
> initiate a metadata fetch if needed, and a subsequent call to poll() would
> handle the results if needed. Basically the same paradigm we use for other
> network communication in the producer and consumer.
>
> best,
> Colin
>
> > Best,
> > Moses
> >
> > On Mon, May 24, 2021 at 1:54 PM Colin McCabe  wrote:
> >
> > > Hi all,
> > >
> > > I agree that we should give users the option of having a fully async
> API,
> > > but I don't think external thread pools or queues are the right
> direction
> > > to go here. They add performance overheads and don't address the root
> > > causes of the problem.
> > >
> > > There are basically two scenarios where we block, currently. One is
> when
> > > we are doing a metadata fetch. I think this is clearly a bug, or at
> least
> > > an implementation limitation. From the user's point of view, the fact
> that
> > > we are doing a metadata fetch is an implementation detail that really
> > > shouldn't be exposed like this. We have talked about fixing this in the
> > > past. I think we just should spend the time to do it.
> > >
> > > The second scenario is where the client has produced too much data in
> too
> > > little time. This could happen if there is a network glitch, or the
> server
> > > is slower than expected. In this case, the behavior is intentional and
> not
> > > a bug. To understand this, think about what would happen if we didn't
> > > block. We would start buffering more and more data in memory, until
> finally
> > > the application died with an out of memory error. That would be
> frustrating
> > > for users and wouldn't add to the usability of Kafka.
> > >
> > > We could potentially have an option to handle the out-of-memory
> scenario
> > > differently by returning an error code immediately rather than
> blocking.
> > > Applications would have to be rewritten to handle this properly, but
> it is
> > > a possibility. I suspect that most of them wouldn't use this, but we
> could
> > > offer it as a possibility for async purists (which might include
> certain
> > > frameworks). The big problem the users would have to solve is what to
> do
> > > with the record that they were unable to produce due to the buffer full
> > > issue.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Thu, May 20, 2021, at 10:35, Nakamura wrote:
> > > > >
> > > > > My suggestion was just do this in multiple steps/phases, firstly
> let's
> > > fix
> > > > > the issue of send being misleadingly asynchronous (i.e. internally
> its
> > > > > blocking) and then later one we can make the various
> > > > > threadpools configurable with a sane default.
> > > >
> > > > I like that approach. I updated the "Which thread should be
> responsible
> > > for
> > > > waiting" part of KIP-739 to add your suggestion as my recommended
> > > approach,
> > > > thank you!  If no one else has major concerns about that approac

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-25 Thread Nakamura
Hey Colin,

For the metadata case, what would fixing the bug look like?  I agree that
we should fix it, but I don't have a clear picture in my mind of what
fixing it should look like.  Can you elaborate?

Best,
Moses

On Mon, May 24, 2021 at 1:54 PM Colin McCabe  wrote:

> Hi all,
>
> I agree that we should give users the option of having a fully async API,
> but I don't think external thread pools or queues are the right direction
> to go here. They add performance overheads and don't address the root
> causes of the problem.
>
> There are basically two scenarios where we block, currently. One is when
> we are doing a metadata fetch. I think this is clearly a bug, or at least
> an implementation limitation. From the user's point of view, the fact that
> we are doing a metadata fetch is an implementation detail that really
> shouldn't be exposed like this. We have talked about fixing this in the
> past. I think we just should spend the time to do it.
>
> The second scenario is where the client has produced too much data in too
> little time. This could happen if there is a network glitch, or the server
> is slower than expected. In this case, the behavior is intentional and not
> a bug. To understand this, think about what would happen if we didn't
> block. We would start buffering more and more data in memory, until finally
> the application died with an out of memory error. That would be frustrating
> for users and wouldn't add to the usability of Kafka.
>
> We could potentially have an option to handle the out-of-memory scenario
> differently by returning an error code immediately rather than blocking.
> Applications would have to be rewritten to handle this properly, but it is
> a possibility. I suspect that most of them wouldn't use this, but we could
> offer it as a possibility for async purists (which might include certain
> frameworks). The big problem the users would have to solve is what to do
> with the record that they were unable to produce due to the buffer full
> issue.
>
> best,
> Colin
>
>
> On Thu, May 20, 2021, at 10:35, Nakamura wrote:
> > >
> > > My suggestion was just do this in multiple steps/phases, firstly let's
> fix
> > > the issue of send being misleadingly asynchronous (i.e. internally its
> > > blocking) and then later one we can make the various
> > > threadpools configurable with a sane default.
> >
> > I like that approach. I updated the "Which thread should be responsible
> for
> > waiting" part of KIP-739 to add your suggestion as my recommended
> approach,
> > thank you!  If no one else has major concerns about that approach, I'll
> > move the alternatives to "rejected alternatives".
> >
> > On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich
> >  wrote:
> >
> > > @
> > >
> > > Nakamura
> > > On Wed, May 19, 2021 at 7:35 PM Nakamura  wrote:
> > >
> > > > @Ryanne:
> > > > In my mind's eye I slightly prefer the throwing the "cannot enqueue"
> > > > exception to satisfying the future immediately with the "cannot
> enqueue"
> > > > exception?  But I agree, it would be worth doing more research.
> > > >
> > > > @Matthew:
> > > >
> > > > > 3. Using multiple thread pools is definitely recommended for
> different
> > > > > types of tasks, for serialization which is CPU bound you definitely
> > > would
> > > > > want to use a bounded thread pool that is fixed by the number of
> CPU's
> > > > (or
> > > > > something along those lines).
> > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c
> is
> > > a
> > > > > very good guide on this topic
> > > > I think this guide is good in general, but I would be hesitant to
> follow
> > > > its guidance re: offloading serialization without benchmarking it.
> My
> > > > understanding is that context-switches have gotten much cheaper, and
> that
> > > > gains from cache locality are small, but they're not nothing.
> Especially
> > > > if the workload has a very small serialization cost, I wouldn't be
> > > shocked
> > > > if it made it slower.  I feel pretty strongly that we should do more
> > > > research here before unconditionally encouraging serialization in a
> > > > threadpool.  If people think it's important to do it here (eg if we
> think
> > > > it would mean another big API change) then we should

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-20 Thread Nakamura
>
> My suggestion was just do this in multiple steps/phases, firstly let's fix
> the issue of send being misleadingly asynchronous (i.e. internally its
> blocking) and then later one we can make the various
> threadpools configurable with a sane default.

I like that approach. I updated the "Which thread should be responsible for
waiting" part of KIP-739 to add your suggestion as my recommended approach,
thank you!  If no one else has major concerns about that approach, I'll
move the alternatives to "rejected alternatives".

On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich
 wrote:

> @
>
> Nakamura
> On Wed, May 19, 2021 at 7:35 PM Nakamura  wrote:
>
> > @Ryanne:
> > In my mind's eye I slightly prefer the throwing the "cannot enqueue"
> > exception to satisfying the future immediately with the "cannot enqueue"
> > exception?  But I agree, it would be worth doing more research.
> >
> > @Matthew:
> >
> > > 3. Using multiple thread pools is definitely recommended for different
> > > types of tasks, for serialization which is CPU bound you definitely
> would
> > > want to use a bounded thread pool that is fixed by the number of CPU's
> > (or
> > > something along those lines).
> > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c is
> a
> > > very good guide on this topic
> > I think this guide is good in general, but I would be hesitant to follow
> > its guidance re: offloading serialization without benchmarking it.  My
> > understanding is that context-switches have gotten much cheaper, and that
> > gains from cache locality are small, but they're not nothing.  Especially
> > if the workload has a very small serialization cost, I wouldn't be
> shocked
> > if it made it slower.  I feel pretty strongly that we should do more
> > research here before unconditionally encouraging serialization in a
> > threadpool.  If people think it's important to do it here (eg if we think
> > it would mean another big API change) then we should start thinking about
> > what benchmarking we can do to gain higher confidence in this kind of
> > change.  However, I don't think it would change semantics as
> substantially
> > as we're proposing here, so I would vote for pushing this to a subsequent
> > KIP.
> >
> Of course, its all down to benchmarking, benchmarking and benchmarking.
> Ideally speaking you want to use all of the resources available to you, so
> if you have a bottleneck in serialization and you have many cores free then
> using multiple cores may be more appropriate than a single core. Typically
> I would expect that using a single thread to do serialization is likely to
> be the most situation, I was just responding to an earlier point that was
> made in regards to using ThreadPools for serialization (note that you can
> also just use a ThreadPool that is pinned to a single thread)
>
>
>
> >
> > > 4. Regarding providing the ability for users to supply their own custom
> > > ThreadPool this is more of an ergonomics question for the API.
> Especially
> > > when it gets to monitoring/tracing, giving the ability for users to
> > provide
> > > their own custom IO/CPU ThreadPools is ideal however as stated doing so
> > > means a lot of boilerplatery changes to the API. Typically speaking a
> lot
> > > of monitoring/tracing/diagnosing is done on
> ExecutionContext/ThreadPools
> > > (at least on a more rudimentary level) and hence allowing users to
> supply
> > a
> > > global singleton ThreadPool for IO tasks and another for CPU tasks
> makes
> > > their lives a lot easier. However due to the large amount of changes to
> > the
> > > API, it may be more appropriate to just use internal thread pools (for
> > now)
> > > since at least it's not any worse than what exists currently and this
> can
> > > be an improvement that is done later?
> > Is there an existing threadpool that you suggest we reuse?  Or are you
> > imagining that we make our own internal threadpool, and then maybe expose
> > configuration flags to manipulate it?  For what it's worth, I like having
> > an internal threadpool (perhaps just FJP.commonpool) and then providing
> an
> > alternative to pass your own threadpool.  That way people who want finer
> > control can get it, and everyone else can do OK with the default.
> >
> Indeed that is what I am saying. The most ideal situation is that there is
> a default internal threadpool that Kafka uses, however users of Kafka can
> con

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-19 Thread Nakamura
at exists currently and this can
> be an improvement that is done later?
>
> On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan 
> wrote:
>
> > I was thinking the sender would typically wrap send() in a backoff/retry
> > loop, or else ignore any failures and drop sends on the floor
> > (fire-and-forget), and in both cases I think failing immediately is
> better
> > than blocking for a new spot in the queue or asynchronously failing
> > somehow.
> >
> > I think a failed future is adequate for the "explicit backpressure
> signal"
> > while avoiding any blocking anywhere. I think if we try to asynchronously
> > signal the caller of failure (either by asynchronously failing the future
> > or invoking a callback off-thread or something) we'd force the caller to
> > either block or poll waiting for that signal, which somewhat defeats the
> > purpose we're after. And of course blocking for a spot in the queue
> > definitely defeats the purpose (tho perhaps ameliorates the problem
> some).
> >
> > Throwing an exception to the caller directly (not via the future) is
> > another option with precedent in Kafka clients, tho it doesn't seem as
> > ergonomic to me.
> >
> > It would be interesting to analyze some existing usage and determine how
> > difficult it would be to convert it to the various proposed APIs.
> >
> > Ryanne
> >
> > On Tue, May 18, 2021, 3:27 PM Nakamura  wrote:
> >
> > > Hi Ryanne,
> > >
> > > Hmm, that's an interesting idea.  Basically it would mean that after
> > > calling send, you would also have to check whether the returned future
> > had
> > > failed with a specific exception.  I would be open to it, although I
> > think
> > > it might be slightly more surprising, since right now the paradigm is
> > > "enqueue synchronously, the future represents whether we succeeded in
> > > sending or not" and the new one would be "enqueue synchronously, the
> > future
> > > either represents whether we succeeded in enqueueing or not (in which
> > case
> > > it will be failed immediately if it failed to enqueue) or whether we
> > > succeeded in sending or not".
> > >
> > > But you're right, it should be on the table, thank you for suggesting
> it!
> > >
> > > Best,
> > > Moses
> > >
> > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan 
> > > wrote:
> > >
> > > > Moses, in the case of a full queue, could we just return a failed
> > future
> > > > immediately?
> > > >
> > > > Ryanne
> > > >
> > > > On Tue, May 18, 2021, 10:39 AM Nakamura  wrote:
> > > >
> > > > > Hi Alexandre,
> > > > >
> > > > > Thanks for bringing this up, I think I could use some feedback in
> > this
> > > > > area.  There are two mechanisms here, one for slowing down when we
> > > don't
> > > > > have the relevant metadata, and the other for slowing down when a
> > queue
> > > > has
> > > > > filled up.  Although the first one applies backpressure somewhat
> > > > > inadvertently, we could still get in trouble if we're not providing
> > > > > information to the mechanism that monitors whether we're queueing
> too
> > > > > much.  As for the second one, that is a classic backpressure use
> > case,
> > > so
> > > > > it's definitely important that we don't drop that ability.
> > > > >
> > > > > Right now backpressure is applied by blocking, which is a natural
> way
> > > to
> > > > > apply backpressure in synchronous systems, but can lead to
> > unnecessary
> > > > > slowdowns in asynchronous systems.  In my opinion, the safest way
> to
> > > > apply
> > > > > backpressure in an asynchronous model is to have an explicit
> > > backpressure
> > > > > signal.  A good example would be returning an exception, and
> > providing
> > > an
> > > > > optional hook to add a callback onto so that you can be notified
> when
> > > > it's
> > > > > ready to accept more messages.
> > > > >
> > > > > However, this would be a really big change to how users use
> > > > > KafkaProducer#send, so I don't know how much appetite we have for
> > > making
> > > &

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-18 Thread Nakamura
Hi Ryanne,

Hmm, that's an interesting idea.  Basically it would mean that after
calling send, you would also have to check whether the returned future had
failed with a specific exception.  I would be open to it, although I think
it might be slightly more surprising, since right now the paradigm is
"enqueue synchronously, the future represents whether we succeeded in
sending or not" and the new one would be "enqueue synchronously, the future
either represents whether we succeeded in enqueueing or not (in which case
it will be failed immediately if it failed to enqueue) or whether we
succeeded in sending or not".

But you're right, it should be on the table, thank you for suggesting it!

Best,
Moses

On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan  wrote:

> Moses, in the case of a full queue, could we just return a failed future
> immediately?
>
> Ryanne
>
> On Tue, May 18, 2021, 10:39 AM Nakamura  wrote:
>
> > Hi Alexandre,
> >
> > Thanks for bringing this up, I think I could use some feedback in this
> > area.  There are two mechanisms here, one for slowing down when we don't
> > have the relevant metadata, and the other for slowing down when a queue
> has
> > filled up.  Although the first one applies backpressure somewhat
> > inadvertently, we could still get in trouble if we're not providing
> > information to the mechanism that monitors whether we're queueing too
> > much.  As for the second one, that is a classic backpressure use case, so
> > it's definitely important that we don't drop that ability.
> >
> > Right now backpressure is applied by blocking, which is a natural way to
> > apply backpressure in synchronous systems, but can lead to unnecessary
> > slowdowns in asynchronous systems.  In my opinion, the safest way to
> apply
> > backpressure in an asynchronous model is to have an explicit backpressure
> > signal.  A good example would be returning an exception, and providing an
> > optional hook to add a callback onto so that you can be notified when
> it's
> > ready to accept more messages.
> >
> > However, this would be a really big change to how users use
> > KafkaProducer#send, so I don't know how much appetite we have for making
> > that kind of change.  Maybe it would be simpler to remove the "don't
> block
> > when the per-topic queue is full" from the scope of this KIP, and only
> > focus on when metadata is available?  The downside is that we will
> probably
> > want to change the API again later to fix this, so it might be better to
> > just rip the bandaid off now.
> >
> > One slightly nasty thing here is that because queueing order is
> important,
> > if we want to use exceptions, we will want to be able to signal the
> failure
> > to enqueue to the caller in such a way that they can still enforce
> message
> > order if they want.  So we can't embed the failure directly in the
> returned
> > future, we should either return two futures (nested, or as a tuple) or
> else
> > throw an exception to explain a backpressure.
> >
> > So there are a few things we should work out here:
> >
> > 1. Should we keep the "too many bytes enqueued" part of this in scope?
> (I
> > would say yes, so that we can minimize churn in this API)
> > 2. How should we signal backpressure so that it's appropriate for
> > asynchronous systems?  (I would say that we should throw an exception.
> If
> > we choose this and we want to pursue the queueing path, we would *not*
> want
> > to enqueue messages that would push us over the limit, and would only
> want
> > to enqueue messages when we're waiting for metadata, and we would want to
> > keep track of the total number of bytes for those messages).
> >
> > What do you think?
> >
> > Best,
> > Moses
> >
> > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez <
> > alexandre.dupr...@gmail.com> wrote:
> >
> > > Hello Nakamura,
> > >
> > > Thanks for proposing this change. I can see how the blocking behaviour
> > > can be a problem when integrating with reactive frameworks such as
> > > Akka. One of the questions I would have is how you would handle back
> > > pressure and avoid memory exhaustion when the producer's buffer is
> > > full and tasks would start to accumulate in the out-of-band queue or
> > > thread pool introduced with this KIP.
> > >
> > > Thanks,
> > > Alexandre
> > >
> > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan  a
> > écrit
> > > 

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-18 Thread Nakamura
Hi Alexandre,

Thanks for bringing this up, I think I could use some feedback in this
area.  There are two mechanisms here, one for slowing down when we don't
have the relevant metadata, and the other for slowing down when a queue has
filled up.  Although the first one applies backpressure somewhat
inadvertently, we could still get in trouble if we're not providing
information to the mechanism that monitors whether we're queueing too
much.  As for the second one, that is a classic backpressure use case, so
it's definitely important that we don't drop that ability.

Right now backpressure is applied by blocking, which is a natural way to
apply backpressure in synchronous systems, but can lead to unnecessary
slowdowns in asynchronous systems.  In my opinion, the safest way to apply
backpressure in an asynchronous model is to have an explicit backpressure
signal.  A good example would be returning an exception, and providing an
optional hook to add a callback onto so that you can be notified when it's
ready to accept more messages.

However, this would be a really big change to how users use
KafkaProducer#send, so I don't know how much appetite we have for making
that kind of change.  Maybe it would be simpler to remove the "don't block
when the per-topic queue is full" from the scope of this KIP, and only
focus on when metadata is available?  The downside is that we will probably
want to change the API again later to fix this, so it might be better to
just rip the bandaid off now.

One slightly nasty thing here is that because queueing order is important,
if we want to use exceptions, we will want to be able to signal the failure
to enqueue to the caller in such a way that they can still enforce message
order if they want.  So we can't embed the failure directly in the returned
future, we should either return two futures (nested, or as a tuple) or else
throw an exception to explain a backpressure.

So there are a few things we should work out here:

1. Should we keep the "too many bytes enqueued" part of this in scope?  (I
would say yes, so that we can minimize churn in this API)
2. How should we signal backpressure so that it's appropriate for
asynchronous systems?  (I would say that we should throw an exception.  If
we choose this and we want to pursue the queueing path, we would *not* want
to enqueue messages that would push us over the limit, and would only want
to enqueue messages when we're waiting for metadata, and we would want to
keep track of the total number of bytes for those messages).

What do you think?

Best,
Moses

On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez <
alexandre.dupr...@gmail.com> wrote:

> Hello Nakamura,
>
> Thanks for proposing this change. I can see how the blocking behaviour
> can be a problem when integrating with reactive frameworks such as
> Akka. One of the questions I would have is how you would handle back
> pressure and avoid memory exhaustion when the producer's buffer is
> full and tasks would start to accumulate in the out-of-band queue or
> thread pool introduced with this KIP.
>
> Thanks,
> Alexandre
>
> Le ven. 14 mai 2021 à 15:55, Ryanne Dolan  a écrit
> :
> >
> > Makes sense!
> >
> > Ryanne
> >
> > On Fri, May 14, 2021, 9:39 AM Nakamura  wrote:
> >
> > > Hey Ryanne,
> > >
> > > I see what you're saying about serde blocking, but I think we should
> > > consider it out of scope for this patch.  Right now we've nailed down a
> > > couple of use cases where we can unambiguously say, "I can make
> progress
> > > now" or "I cannot make progress now", which makes it possible to
> offload to
> > > a different thread only if we are unable to make progress.  Extending
> this
> > > to CPU work like serde would mean always offloading, which would be a
> > > really big performance change.  It might be worth exploring anyway,
> but I'd
> > > rather keep this patch focused on improving ergonomics, rather than
> > > muddying the waters with evaluating performance very deeply.
> > >
> > > I think if we really do want to support serde or interceptors that do
> IO on
> > > the send path (which seems like an anti-pattern to me), we should
> consider
> > > making that a separate SIP, and probably also consider changing the
> API to
> > > use Futures (or CompletionStages).  But I would rather avoid scope
> creep,
> > > so that we have a better chance of fixing this part of the problem.
> > >
> > > Yes, I think some exceptions will move to being async instead of sync.
> > > They'll still be surfaced in the Future, so I'm not so confident that
> it
> > > wou

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-14 Thread Nakamura
Hey Ryanne,

I see what you're saying about serde blocking, but I think we should
consider it out of scope for this patch.  Right now we've nailed down a
couple of use cases where we can unambiguously say, "I can make progress
now" or "I cannot make progress now", which makes it possible to offload to
a different thread only if we are unable to make progress.  Extending this
to CPU work like serde would mean always offloading, which would be a
really big performance change.  It might be worth exploring anyway, but I'd
rather keep this patch focused on improving ergonomics, rather than
muddying the waters with evaluating performance very deeply.

I think if we really do want to support serde or interceptors that do IO on
the send path (which seems like an anti-pattern to me), we should consider
making that a separate SIP, and probably also consider changing the API to
use Futures (or CompletionStages).  But I would rather avoid scope creep,
so that we have a better chance of fixing this part of the problem.

Yes, I think some exceptions will move to being async instead of sync.
They'll still be surfaced in the Future, so I'm not so confident that it
would be that big a shock to users though.

Best,
Moses

On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan  wrote:

> re serialization, my concern is that serialization often accounts for a lot
> of the cycles spent before returning the future. It's not blocking per se,
> but it's the same effect from the caller's perspective.
>
> Moreover, serde impls often block themselves, e.g. when fetching schemas
> from a registry. I suppose it's also possible to block in Interceptors
> (e.g. writing audit events or metrics), which happens before serdes iiuc.
> So any blocking in either of those plugins would block the send unless we
> queue first.
>
> So I think we want to queue first and do everything off-thread when using
> the new API, whatever that looks like. I just want to make sure we don't do
> that for clients that wouldn't expect it.
>
> Another consideration is exception handling. If we queue right away, we'll
> defer some exceptions that currently are thrown to the caller (before the
> future is returned). In the new API, the send() wouldn't throw any
> exceptions, and instead the future would fail. I think that might mean that
> a new method signature is required.
>
> Ryanne
>
>
> On Thu, May 13, 2021, 2:57 PM Nakamura  wrote:
>
> > Hey Ryanne,
> >
> > I agree we should add an additional constructor (or else an additional
> > overload in KafkaProducer#send, but the new constructor would be easier
> to
> > understand) if we're targeting the "user provides the thread" approach.
> >
> > From looking at the code, I think we can keep record serialization on the
> > user thread, if we consider that an important part of the semantics of
> this
> > method.  It doesn't seem like serialization depends on knowing the
> cluster,
> > I think it's incidental that it comes after the first "blocking" segment
> in
> > the method.
> >
> > Best,
> > Moses
> >
> > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan 
> > wrote:
> >
> > > Hey Moses, I like the direction here. My thinking is that a single
> > > additional work queue, s.t. send() can enqueue and return, seems like
> the
> > > lightest touch. However, I don't think we can trivially process that
> > queue
> > > in an internal thread pool without subtly changing behavior for some
> > users.
> > > For example, users will often run send() in multiple threads in order
> to
> > > serialize faster, but that wouldn't work quite the same if there were
> an
> > > internal thread pool.
> > >
> > > For this reason I'm thinking we need to make sure any such changes are
> > > opt-in. Maybe a new constructor with an additional ThreadFactory
> > parameter.
> > > That would at least clearly indicate that work will happen off-thread,
> > and
> > > would require opt-in for the new behavior.
> > >
> > > Under the hood, this ThreadFactory could be used to create the worker
> > > thread that process queued sends, which could fan-out to per-partition
> > > threads from there.
> > >
> > > So then you'd have two ways to send: the existing way, where serde and
> > > interceptors and whatnot are executed on the calling thread, and the
> new
> > > way, which returns right away and uses an internal Executor. As you
> point
> > > out, the semantics would be identical in either case, and it would be

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-13 Thread Nakamura
Hey Ryanne,

I agree we should add an additional constructor (or else an additional
overload in KafkaProducer#send, but the new constructor would be easier to
understand) if we're targeting the "user provides the thread" approach.

>From looking at the code, I think we can keep record serialization on the
user thread, if we consider that an important part of the semantics of this
method.  It doesn't seem like serialization depends on knowing the cluster,
I think it's incidental that it comes after the first "blocking" segment in
the method.

Best,
Moses

On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan  wrote:

> Hey Moses, I like the direction here. My thinking is that a single
> additional work queue, s.t. send() can enqueue and return, seems like the
> lightest touch. However, I don't think we can trivially process that queue
> in an internal thread pool without subtly changing behavior for some users.
> For example, users will often run send() in multiple threads in order to
> serialize faster, but that wouldn't work quite the same if there were an
> internal thread pool.
>
> For this reason I'm thinking we need to make sure any such changes are
> opt-in. Maybe a new constructor with an additional ThreadFactory parameter.
> That would at least clearly indicate that work will happen off-thread, and
> would require opt-in for the new behavior.
>
> Under the hood, this ThreadFactory could be used to create the worker
> thread that process queued sends, which could fan-out to per-partition
> threads from there.
>
> So then you'd have two ways to send: the existing way, where serde and
> interceptors and whatnot are executed on the calling thread, and the new
> way, which returns right away and uses an internal Executor. As you point
> out, the semantics would be identical in either case, and it would be very
> easy for clients to switch.
>
> Ryanne
>
> On Thu, May 13, 2021, 9:00 AM Nakamura  wrote:
>
> > Hey Folks,
> > I just posted a new proposal
> > <
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
> > >
> > in the wiki.  I think we have an opportunity to improve the
> > KafkaProducer#send user experience.  It would certainly make our lives
> > easier.  Please take a look!  There are two subproblems that I could use
> > guidance on, so I would appreciate feedback on both of them.
> > Best,
> > Moses
> >
>


[DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-13 Thread Nakamura
Hey Folks,
I just posted a new proposal

in the wiki.  I think we have an opportunity to improve the
KafkaProducer#send user experience.  It would certainly make our lives
easier.  Please take a look!  There are two subproblems that I could use
guidance on, so I would appreciate feedback on both of them.
Best,
Moses


Re: please give me permission to create a KIP

2021-05-12 Thread Nakamura
Thank you!

On Wed, May 12, 2021 at 4:37 PM Matthias J. Sax  wrote:

> Added you.
>
> On 5/12/21 9:23 AM, Nakamura wrote:
> > Hi Folks,
> > I'd like to propose a KIP per Ewen's advice here
> > <
> https://issues.apache.org/jira/browse/KAFKA-3539?focusedCommentId=17343043
> >.
> > However, I don't have permission to create a KIP yet.  Please grant it to
> > me!  My username at cwiki.apache.org is moses.nakamura.  Thanks!
> > Best,
> > Moses
> >
>


please give me permission to create a KIP

2021-05-12 Thread Nakamura
Hi Folks,
I'd like to propose a KIP per Ewen's advice here
.
However, I don't have permission to create a KIP yet.  Please grant it to
me!  My username at cwiki.apache.org is moses.nakamura.  Thanks!
Best,
Moses