Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-30 Thread Brian Byrne
Thanks everyone. With 3 binding votes from Guozhang, Jason, and Rajini, the
vote has passed.

Brian


On Tue, Jan 28, 2020 at 11:45 AM Ted Yu  wrote:

> +1
>
> On Tue, Jan 28, 2020 at 10:52 AM Rajini Sivaram 
> wrote:
>
> > +1 (binding)
> >
> > Thanks for the KIP, Brian!
> >
> > Regards,
> >
> > Rajini
> >
> > On Thu, Jan 23, 2020 at 7:34 PM Jason Gustafson 
> > wrote:
> >
> > > Sounds good. +1 from me.
> > >
> > > On Thu, Jan 23, 2020 at 9:00 AM Brian Byrne 
> wrote:
> > >
> > > > Thanks Jason,
> > > >
> > > > I'm in favor of the latter: metadata.max.idle.ms. I agree that
> > > describing
> > > > it as a "period" is inaccurate. With metadata.max.idle.ms, it also
> > > aligns
> > > > with metadata.max.age.ms for determining refresh period (which is an
> > > > actual
> > > > period).
> > > >
> > > > I've updated the docs.
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > > > On Wed, Jan 22, 2020 at 6:19 PM Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Thanks for the proposal. Looks good overall. I wanted to suggest a
> > > > possible
> > > > > name change. I was considering something like `
> > > > idle.metadata.expiration.ms
> > > > > `
> > > > > or maybe `metadata.max.idle.ms`. Thoughts?
> > > > >
> > > > > -Jason
> > > > >
> > > > >
> > > > > On Tue, Jan 21, 2020 at 11:38 AM Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Got it.
> > > > > >
> > > > > > I was proposing that we do the "delayed async batch" but I think
> > your
> > > > > > argument for complexity and pushing it out of the scope is
> > > convincing,
> > > > so
> > > > > > instead I propose we do the synchronous mini batching still but
> > > > obviously
> > > > > > it is already there :)  I'm +1 on the current proposal scope.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Tue, Jan 21, 2020 at 10:16 AM Brian Byrne <
> bby...@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Guozhang,
> > > > > > >
> > > > > > > Ah, sorry, I misunderstood. Actually, this is solved for us
> > today.
> > > > How
> > > > > > the
> > > > > > > producer works is that it maintains at most one inflight
> metadata
> > > > fetch
> > > > > > > request at any time, where each request is tagged with the
> > current
> > > > > > > (monotonically increasing) request version. This version is
> > bumped
> > > > > > whenever
> > > > > > > a new topic is encountered, and metadata fetching will continue
> > to
> > > > > > process
> > > > > > > while the latest metadata response's version is below the
> current
> > > > > > version.
> > > > > > >
> > > > > > > So if a metadata request is in flight, and a number of threads
> > > > produce
> > > > > to
> > > > > > > new topics, they'll be added to the working set but the next
> > > metadata
> > > > > > > request won't take place until the outstanding one returns. So
> > > their
> > > > > > > updates will be batched together. As you suggest, we can have a
> > > > simple
> > > > > > list
> > > > > > > that tracks unknown topics to isolate new vs. old topics.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Brian
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Brian,
> > > > > > > >
> > > > > > > > I think I buy the complexity and extra end-to-end-latency
> > > argument
> > > > :)
> > > > > > I'm
> > > > > > > > fine with delaying the asynchronous tech fetching to future
> > works
> > > > and
> > > > > > > keep
> > > > > > > > the current KIP's scope as-is for now. Under that case can we
> > > > > consider
> > > > > > > just
> > > > > > > > a minor implementation detail (since it is not affecting
> public
> > > > APIs
> > > > > we
> > > > > > > > probably do not even need to list it, but just thinking loud
> > > here):
> > > > > > > >
> > > > > > > > In your proposal when we request for a topic of unknown
> > metadata,
> > > > we
> > > > > > are
> > > > > > > > going to directly set the topic name as that singleton in the
> > > > > request.
> > > > > > > I'm
> > > > > > > > wondering for the scenario that KAFKA-8904 described, if the
> > > > > > > producer#send
> > > > > > > > for thousands of new topics are triggered sequentially by a
> > > single
> > > > > > thread
> > > > > > > > or concurrent threads? If it's the latter, and we expect in
> > such
> > > > > > > scenarios
> > > > > > > > we may have multiple topics being requests within a very
> short
> > > > time,
> > > > > > then
> > > > > > > > we can probably do sth. like this internally in a
> synchronized
> > > > > manner:
> > > > > > > >
> > > > > > > > 1) put the topic name into a list, as "unknown topics", then
> > > > > > > > 2) exhaust the list, and put all topics from that list to the
> > > > > request;
> > > > > > if
> > > > > > > > the list is empty, it means it has been emptied by another
> > thread
> > > > so
> > > > > we
> > > > > > > > skip sending a new request and just wait for the returned
> 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-28 Thread Ted Yu
+1

On Tue, Jan 28, 2020 at 10:52 AM Rajini Sivaram 
wrote:

> +1 (binding)
>
> Thanks for the KIP, Brian!
>
> Regards,
>
> Rajini
>
> On Thu, Jan 23, 2020 at 7:34 PM Jason Gustafson 
> wrote:
>
> > Sounds good. +1 from me.
> >
> > On Thu, Jan 23, 2020 at 9:00 AM Brian Byrne  wrote:
> >
> > > Thanks Jason,
> > >
> > > I'm in favor of the latter: metadata.max.idle.ms. I agree that
> > describing
> > > it as a "period" is inaccurate. With metadata.max.idle.ms, it also
> > aligns
> > > with metadata.max.age.ms for determining refresh period (which is an
> > > actual
> > > period).
> > >
> > > I've updated the docs.
> > >
> > > Thanks,
> > > Brian
> > >
> > > On Wed, Jan 22, 2020 at 6:19 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Thanks for the proposal. Looks good overall. I wanted to suggest a
> > > possible
> > > > name change. I was considering something like `
> > > idle.metadata.expiration.ms
> > > > `
> > > > or maybe `metadata.max.idle.ms`. Thoughts?
> > > >
> > > > -Jason
> > > >
> > > >
> > > > On Tue, Jan 21, 2020 at 11:38 AM Guozhang Wang 
> > > wrote:
> > > >
> > > > > Got it.
> > > > >
> > > > > I was proposing that we do the "delayed async batch" but I think
> your
> > > > > argument for complexity and pushing it out of the scope is
> > convincing,
> > > so
> > > > > instead I propose we do the synchronous mini batching still but
> > > obviously
> > > > > it is already there :)  I'm +1 on the current proposal scope.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Jan 21, 2020 at 10:16 AM Brian Byrne 
> > > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Ah, sorry, I misunderstood. Actually, this is solved for us
> today.
> > > How
> > > > > the
> > > > > > producer works is that it maintains at most one inflight metadata
> > > fetch
> > > > > > request at any time, where each request is tagged with the
> current
> > > > > > (monotonically increasing) request version. This version is
> bumped
> > > > > whenever
> > > > > > a new topic is encountered, and metadata fetching will continue
> to
> > > > > process
> > > > > > while the latest metadata response's version is below the current
> > > > > version.
> > > > > >
> > > > > > So if a metadata request is in flight, and a number of threads
> > > produce
> > > > to
> > > > > > new topics, they'll be added to the working set but the next
> > metadata
> > > > > > request won't take place until the outstanding one returns. So
> > their
> > > > > > updates will be batched together. As you suggest, we can have a
> > > simple
> > > > > list
> > > > > > that tracks unknown topics to isolate new vs. old topics.
> > > > > >
> > > > > > Thanks,
> > > > > > Brian
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Brian,
> > > > > > >
> > > > > > > I think I buy the complexity and extra end-to-end-latency
> > argument
> > > :)
> > > > > I'm
> > > > > > > fine with delaying the asynchronous tech fetching to future
> works
> > > and
> > > > > > keep
> > > > > > > the current KIP's scope as-is for now. Under that case can we
> > > > consider
> > > > > > just
> > > > > > > a minor implementation detail (since it is not affecting public
> > > APIs
> > > > we
> > > > > > > probably do not even need to list it, but just thinking loud
> > here):
> > > > > > >
> > > > > > > In your proposal when we request for a topic of unknown
> metadata,
> > > we
> > > > > are
> > > > > > > going to directly set the topic name as that singleton in the
> > > > request.
> > > > > > I'm
> > > > > > > wondering for the scenario that KAFKA-8904 described, if the
> > > > > > producer#send
> > > > > > > for thousands of new topics are triggered sequentially by a
> > single
> > > > > thread
> > > > > > > or concurrent threads? If it's the latter, and we expect in
> such
> > > > > > scenarios
> > > > > > > we may have multiple topics being requests within a very short
> > > time,
> > > > > then
> > > > > > > we can probably do sth. like this internally in a synchronized
> > > > manner:
> > > > > > >
> > > > > > > 1) put the topic name into a list, as "unknown topics", then
> > > > > > > 2) exhaust the list, and put all topics from that list to the
> > > > request;
> > > > > if
> > > > > > > the list is empty, it means it has been emptied by another
> thread
> > > so
> > > > we
> > > > > > > skip sending a new request and just wait for the returned
> > metadata
> > > > > > refresh.
> > > > > > >
> > > > > > > In most cases the list would just be a singleton with the one
> > that
> > > > > thread
> > > > > > > has just enqueued, but under extreme scenarios it can help
> > > batching a
> > > > > few
> > > > > > > topic names probably (of course, I'm thinking about very
> extreme
> > > > cases
> > > > > > > here, assuming that's was what we've seen in 8904). Since these
> > two
> > > > > steps
> > > > > > > are very light-weighted, doing that in 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-28 Thread Rajini Sivaram
+1 (binding)

Thanks for the KIP, Brian!

Regards,

Rajini

On Thu, Jan 23, 2020 at 7:34 PM Jason Gustafson  wrote:

> Sounds good. +1 from me.
>
> On Thu, Jan 23, 2020 at 9:00 AM Brian Byrne  wrote:
>
> > Thanks Jason,
> >
> > I'm in favor of the latter: metadata.max.idle.ms. I agree that
> describing
> > it as a "period" is inaccurate. With metadata.max.idle.ms, it also
> aligns
> > with metadata.max.age.ms for determining refresh period (which is an
> > actual
> > period).
> >
> > I've updated the docs.
> >
> > Thanks,
> > Brian
> >
> > On Wed, Jan 22, 2020 at 6:19 PM Jason Gustafson 
> > wrote:
> >
> > > Thanks for the proposal. Looks good overall. I wanted to suggest a
> > possible
> > > name change. I was considering something like `
> > idle.metadata.expiration.ms
> > > `
> > > or maybe `metadata.max.idle.ms`. Thoughts?
> > >
> > > -Jason
> > >
> > >
> > > On Tue, Jan 21, 2020 at 11:38 AM Guozhang Wang 
> > wrote:
> > >
> > > > Got it.
> > > >
> > > > I was proposing that we do the "delayed async batch" but I think your
> > > > argument for complexity and pushing it out of the scope is
> convincing,
> > so
> > > > instead I propose we do the synchronous mini batching still but
> > obviously
> > > > it is already there :)  I'm +1 on the current proposal scope.
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Jan 21, 2020 at 10:16 AM Brian Byrne 
> > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > Ah, sorry, I misunderstood. Actually, this is solved for us today.
> > How
> > > > the
> > > > > producer works is that it maintains at most one inflight metadata
> > fetch
> > > > > request at any time, where each request is tagged with the current
> > > > > (monotonically increasing) request version. This version is bumped
> > > > whenever
> > > > > a new topic is encountered, and metadata fetching will continue to
> > > > process
> > > > > while the latest metadata response's version is below the current
> > > > version.
> > > > >
> > > > > So if a metadata request is in flight, and a number of threads
> > produce
> > > to
> > > > > new topics, they'll be added to the working set but the next
> metadata
> > > > > request won't take place until the outstanding one returns. So
> their
> > > > > updates will be batched together. As you suggest, we can have a
> > simple
> > > > list
> > > > > that tracks unknown topics to isolate new vs. old topics.
> > > > >
> > > > > Thanks,
> > > > > Brian
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hi Brian,
> > > > > >
> > > > > > I think I buy the complexity and extra end-to-end-latency
> argument
> > :)
> > > > I'm
> > > > > > fine with delaying the asynchronous tech fetching to future works
> > and
> > > > > keep
> > > > > > the current KIP's scope as-is for now. Under that case can we
> > > consider
> > > > > just
> > > > > > a minor implementation detail (since it is not affecting public
> > APIs
> > > we
> > > > > > probably do not even need to list it, but just thinking loud
> here):
> > > > > >
> > > > > > In your proposal when we request for a topic of unknown metadata,
> > we
> > > > are
> > > > > > going to directly set the topic name as that singleton in the
> > > request.
> > > > > I'm
> > > > > > wondering for the scenario that KAFKA-8904 described, if the
> > > > > producer#send
> > > > > > for thousands of new topics are triggered sequentially by a
> single
> > > > thread
> > > > > > or concurrent threads? If it's the latter, and we expect in such
> > > > > scenarios
> > > > > > we may have multiple topics being requests within a very short
> > time,
> > > > then
> > > > > > we can probably do sth. like this internally in a synchronized
> > > manner:
> > > > > >
> > > > > > 1) put the topic name into a list, as "unknown topics", then
> > > > > > 2) exhaust the list, and put all topics from that list to the
> > > request;
> > > > if
> > > > > > the list is empty, it means it has been emptied by another thread
> > so
> > > we
> > > > > > skip sending a new request and just wait for the returned
> metadata
> > > > > refresh.
> > > > > >
> > > > > > In most cases the list would just be a singleton with the one
> that
> > > > thread
> > > > > > has just enqueued, but under extreme scenarios it can help
> > batching a
> > > > few
> > > > > > topic names probably (of course, I'm thinking about very extreme
> > > cases
> > > > > > here, assuming that's was what we've seen in 8904). Since these
> two
> > > > steps
> > > > > > are very light-weighted, doing that in a synchronized block would
> > not
> > > > > hurt
> > > > > > the concurrency too much.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 21, 2020 at 9:39 AM Brian Byrne  >
> > > > wrote:
> > > > > >
> > > > > > > Hi Guozhang,
> > > > > > >
> > > > > > > Your understanding of the rationale is accurate, and what you
> > > suggest
> > > > > is
> > > > > > > completely 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-23 Thread Jason Gustafson
Sounds good. +1 from me.

On Thu, Jan 23, 2020 at 9:00 AM Brian Byrne  wrote:

> Thanks Jason,
>
> I'm in favor of the latter: metadata.max.idle.ms. I agree that describing
> it as a "period" is inaccurate. With metadata.max.idle.ms, it also aligns
> with metadata.max.age.ms for determining refresh period (which is an
> actual
> period).
>
> I've updated the docs.
>
> Thanks,
> Brian
>
> On Wed, Jan 22, 2020 at 6:19 PM Jason Gustafson 
> wrote:
>
> > Thanks for the proposal. Looks good overall. I wanted to suggest a
> possible
> > name change. I was considering something like `
> idle.metadata.expiration.ms
> > `
> > or maybe `metadata.max.idle.ms`. Thoughts?
> >
> > -Jason
> >
> >
> > On Tue, Jan 21, 2020 at 11:38 AM Guozhang Wang 
> wrote:
> >
> > > Got it.
> > >
> > > I was proposing that we do the "delayed async batch" but I think your
> > > argument for complexity and pushing it out of the scope is convincing,
> so
> > > instead I propose we do the synchronous mini batching still but
> obviously
> > > it is already there :)  I'm +1 on the current proposal scope.
> > >
> > > Guozhang
> > >
> > > On Tue, Jan 21, 2020 at 10:16 AM Brian Byrne 
> > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Ah, sorry, I misunderstood. Actually, this is solved for us today.
> How
> > > the
> > > > producer works is that it maintains at most one inflight metadata
> fetch
> > > > request at any time, where each request is tagged with the current
> > > > (monotonically increasing) request version. This version is bumped
> > > whenever
> > > > a new topic is encountered, and metadata fetching will continue to
> > > process
> > > > while the latest metadata response's version is below the current
> > > version.
> > > >
> > > > So if a metadata request is in flight, and a number of threads
> produce
> > to
> > > > new topics, they'll be added to the working set but the next metadata
> > > > request won't take place until the outstanding one returns. So their
> > > > updates will be batched together. As you suggest, we can have a
> simple
> > > list
> > > > that tracks unknown topics to isolate new vs. old topics.
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > > >
> > > >
> > > > On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hi Brian,
> > > > >
> > > > > I think I buy the complexity and extra end-to-end-latency argument
> :)
> > > I'm
> > > > > fine with delaying the asynchronous tech fetching to future works
> and
> > > > keep
> > > > > the current KIP's scope as-is for now. Under that case can we
> > consider
> > > > just
> > > > > a minor implementation detail (since it is not affecting public
> APIs
> > we
> > > > > probably do not even need to list it, but just thinking loud here):
> > > > >
> > > > > In your proposal when we request for a topic of unknown metadata,
> we
> > > are
> > > > > going to directly set the topic name as that singleton in the
> > request.
> > > > I'm
> > > > > wondering for the scenario that KAFKA-8904 described, if the
> > > > producer#send
> > > > > for thousands of new topics are triggered sequentially by a single
> > > thread
> > > > > or concurrent threads? If it's the latter, and we expect in such
> > > > scenarios
> > > > > we may have multiple topics being requests within a very short
> time,
> > > then
> > > > > we can probably do sth. like this internally in a synchronized
> > manner:
> > > > >
> > > > > 1) put the topic name into a list, as "unknown topics", then
> > > > > 2) exhaust the list, and put all topics from that list to the
> > request;
> > > if
> > > > > the list is empty, it means it has been emptied by another thread
> so
> > we
> > > > > skip sending a new request and just wait for the returned metadata
> > > > refresh.
> > > > >
> > > > > In most cases the list would just be a singleton with the one that
> > > thread
> > > > > has just enqueued, but under extreme scenarios it can help
> batching a
> > > few
> > > > > topic names probably (of course, I'm thinking about very extreme
> > cases
> > > > > here, assuming that's was what we've seen in 8904). Since these two
> > > steps
> > > > > are very light-weighted, doing that in a synchronized block would
> not
> > > > hurt
> > > > > the concurrency too much.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Jan 21, 2020 at 9:39 AM Brian Byrne 
> > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Your understanding of the rationale is accurate, and what you
> > suggest
> > > > is
> > > > > > completely plausible, however I have a slightly different take on
> > the
> > > > > > situation.
> > > > > >
> > > > > > When the KIP was originally drafted, making KafkaProducer#send
> > > > > asynchronous
> > > > > > was one element of the changes (this is a little more general
> than
> > > (a),
> > > > > but
> > > > > > has similar implications). As you're aware, doing so would allow
> > new
> > > > > topics
> > > > > > to aggregate since the 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-23 Thread Brian Byrne
Thanks Jason,

I'm in favor of the latter: metadata.max.idle.ms. I agree that describing
it as a "period" is inaccurate. With metadata.max.idle.ms, it also aligns
with metadata.max.age.ms for determining refresh period (which is an actual
period).

I've updated the docs.

Thanks,
Brian

On Wed, Jan 22, 2020 at 6:19 PM Jason Gustafson  wrote:

> Thanks for the proposal. Looks good overall. I wanted to suggest a possible
> name change. I was considering something like `idle.metadata.expiration.ms
> `
> or maybe `metadata.max.idle.ms`. Thoughts?
>
> -Jason
>
>
> On Tue, Jan 21, 2020 at 11:38 AM Guozhang Wang  wrote:
>
> > Got it.
> >
> > I was proposing that we do the "delayed async batch" but I think your
> > argument for complexity and pushing it out of the scope is convincing, so
> > instead I propose we do the synchronous mini batching still but obviously
> > it is already there :)  I'm +1 on the current proposal scope.
> >
> > Guozhang
> >
> > On Tue, Jan 21, 2020 at 10:16 AM Brian Byrne 
> wrote:
> >
> > > Hi Guozhang,
> > >
> > > Ah, sorry, I misunderstood. Actually, this is solved for us today. How
> > the
> > > producer works is that it maintains at most one inflight metadata fetch
> > > request at any time, where each request is tagged with the current
> > > (monotonically increasing) request version. This version is bumped
> > whenever
> > > a new topic is encountered, and metadata fetching will continue to
> > process
> > > while the latest metadata response's version is below the current
> > version.
> > >
> > > So if a metadata request is in flight, and a number of threads produce
> to
> > > new topics, they'll be added to the working set but the next metadata
> > > request won't take place until the outstanding one returns. So their
> > > updates will be batched together. As you suggest, we can have a simple
> > list
> > > that tracks unknown topics to isolate new vs. old topics.
> > >
> > > Thanks,
> > > Brian
> > >
> > >
> > >
> > > On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang 
> > wrote:
> > >
> > > > Hi Brian,
> > > >
> > > > I think I buy the complexity and extra end-to-end-latency argument :)
> > I'm
> > > > fine with delaying the asynchronous tech fetching to future works and
> > > keep
> > > > the current KIP's scope as-is for now. Under that case can we
> consider
> > > just
> > > > a minor implementation detail (since it is not affecting public APIs
> we
> > > > probably do not even need to list it, but just thinking loud here):
> > > >
> > > > In your proposal when we request for a topic of unknown metadata, we
> > are
> > > > going to directly set the topic name as that singleton in the
> request.
> > > I'm
> > > > wondering for the scenario that KAFKA-8904 described, if the
> > > producer#send
> > > > for thousands of new topics are triggered sequentially by a single
> > thread
> > > > or concurrent threads? If it's the latter, and we expect in such
> > > scenarios
> > > > we may have multiple topics being requests within a very short time,
> > then
> > > > we can probably do sth. like this internally in a synchronized
> manner:
> > > >
> > > > 1) put the topic name into a list, as "unknown topics", then
> > > > 2) exhaust the list, and put all topics from that list to the
> request;
> > if
> > > > the list is empty, it means it has been emptied by another thread so
> we
> > > > skip sending a new request and just wait for the returned metadata
> > > refresh.
> > > >
> > > > In most cases the list would just be a singleton with the one that
> > thread
> > > > has just enqueued, but under extreme scenarios it can help batching a
> > few
> > > > topic names probably (of course, I'm thinking about very extreme
> cases
> > > > here, assuming that's was what we've seen in 8904). Since these two
> > steps
> > > > are very light-weighted, doing that in a synchronized block would not
> > > hurt
> > > > the concurrency too much.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Jan 21, 2020 at 9:39 AM Brian Byrne 
> > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > Your understanding of the rationale is accurate, and what you
> suggest
> > > is
> > > > > completely plausible, however I have a slightly different take on
> the
> > > > > situation.
> > > > >
> > > > > When the KIP was originally drafted, making KafkaProducer#send
> > > > asynchronous
> > > > > was one element of the changes (this is a little more general than
> > (a),
> > > > but
> > > > > has similar implications). As you're aware, doing so would allow
> new
> > > > topics
> > > > > to aggregate since the producer could continue to push new records,
> > > > whereas
> > > > > today the producer thread is blocked waiting for resolution.
> > > > >
> > > > > However, there were concerns about changing client behavior
> > > unexpectedly
> > > > in
> > > > > this manner, and the change isn't as trivial as one would hope. For
> > > > > example, we'd have to introduce an intermediate queue of records
> for

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-22 Thread Jason Gustafson
Thanks for the proposal. Looks good overall. I wanted to suggest a possible
name change. I was considering something like `idle.metadata.expiration.ms`
or maybe `metadata.max.idle.ms`. Thoughts?

-Jason


On Tue, Jan 21, 2020 at 11:38 AM Guozhang Wang  wrote:

> Got it.
>
> I was proposing that we do the "delayed async batch" but I think your
> argument for complexity and pushing it out of the scope is convincing, so
> instead I propose we do the synchronous mini batching still but obviously
> it is already there :)  I'm +1 on the current proposal scope.
>
> Guozhang
>
> On Tue, Jan 21, 2020 at 10:16 AM Brian Byrne  wrote:
>
> > Hi Guozhang,
> >
> > Ah, sorry, I misunderstood. Actually, this is solved for us today. How
> the
> > producer works is that it maintains at most one inflight metadata fetch
> > request at any time, where each request is tagged with the current
> > (monotonically increasing) request version. This version is bumped
> whenever
> > a new topic is encountered, and metadata fetching will continue to
> process
> > while the latest metadata response's version is below the current
> version.
> >
> > So if a metadata request is in flight, and a number of threads produce to
> > new topics, they'll be added to the working set but the next metadata
> > request won't take place until the outstanding one returns. So their
> > updates will be batched together. As you suggest, we can have a simple
> list
> > that tracks unknown topics to isolate new vs. old topics.
> >
> > Thanks,
> > Brian
> >
> >
> >
> > On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang 
> wrote:
> >
> > > Hi Brian,
> > >
> > > I think I buy the complexity and extra end-to-end-latency argument :)
> I'm
> > > fine with delaying the asynchronous tech fetching to future works and
> > keep
> > > the current KIP's scope as-is for now. Under that case can we consider
> > just
> > > a minor implementation detail (since it is not affecting public APIs we
> > > probably do not even need to list it, but just thinking loud here):
> > >
> > > In your proposal when we request for a topic of unknown metadata, we
> are
> > > going to directly set the topic name as that singleton in the request.
> > I'm
> > > wondering for the scenario that KAFKA-8904 described, if the
> > producer#send
> > > for thousands of new topics are triggered sequentially by a single
> thread
> > > or concurrent threads? If it's the latter, and we expect in such
> > scenarios
> > > we may have multiple topics being requests within a very short time,
> then
> > > we can probably do sth. like this internally in a synchronized manner:
> > >
> > > 1) put the topic name into a list, as "unknown topics", then
> > > 2) exhaust the list, and put all topics from that list to the request;
> if
> > > the list is empty, it means it has been emptied by another thread so we
> > > skip sending a new request and just wait for the returned metadata
> > refresh.
> > >
> > > In most cases the list would just be a singleton with the one that
> thread
> > > has just enqueued, but under extreme scenarios it can help batching a
> few
> > > topic names probably (of course, I'm thinking about very extreme cases
> > > here, assuming that's was what we've seen in 8904). Since these two
> steps
> > > are very light-weighted, doing that in a synchronized block would not
> > hurt
> > > the concurrency too much.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jan 21, 2020 at 9:39 AM Brian Byrne 
> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Your understanding of the rationale is accurate, and what you suggest
> > is
> > > > completely plausible, however I have a slightly different take on the
> > > > situation.
> > > >
> > > > When the KIP was originally drafted, making KafkaProducer#send
> > > asynchronous
> > > > was one element of the changes (this is a little more general than
> (a),
> > > but
> > > > has similar implications). As you're aware, doing so would allow new
> > > topics
> > > > to aggregate since the producer could continue to push new records,
> > > whereas
> > > > today the producer thread is blocked waiting for resolution.
> > > >
> > > > However, there were concerns about changing client behavior
> > unexpectedly
> > > in
> > > > this manner, and the change isn't as trivial as one would hope. For
> > > > example, we'd have to introduce an intermediate queue of records for
> > > topics
> > > > without metadata, and have that play well with the buffer pool which
> > > > ensures the memory limit isn't exceeded. A side effect is that a
> > producer
> > > > could hit 'memory full' conditions easier, which could have
> unintended
> > > > consequences if, say, the model was setup such that different
> producer
> > > > threads produced to a disjoint set of topics. Where one producer
> thread
> > > was
> > > > blocked waiting for new metadata, it could now push enough data to
> > block
> > > > all producer threads due to memory limits, so we'd need to be careful
> > > here.
> > > 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-21 Thread Guozhang Wang
Got it.

I was proposing that we do the "delayed async batch" but I think your
argument for complexity and pushing it out of the scope is convincing, so
instead I propose we do the synchronous mini batching still but obviously
it is already there :)  I'm +1 on the current proposal scope.

Guozhang

On Tue, Jan 21, 2020 at 10:16 AM Brian Byrne  wrote:

> Hi Guozhang,
>
> Ah, sorry, I misunderstood. Actually, this is solved for us today. How the
> producer works is that it maintains at most one inflight metadata fetch
> request at any time, where each request is tagged with the current
> (monotonically increasing) request version. This version is bumped whenever
> a new topic is encountered, and metadata fetching will continue to process
> while the latest metadata response's version is below the current version.
>
> So if a metadata request is in flight, and a number of threads produce to
> new topics, they'll be added to the working set but the next metadata
> request won't take place until the outstanding one returns. So their
> updates will be batched together. As you suggest, we can have a simple list
> that tracks unknown topics to isolate new vs. old topics.
>
> Thanks,
> Brian
>
>
>
> On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang  wrote:
>
> > Hi Brian,
> >
> > I think I buy the complexity and extra end-to-end-latency argument :) I'm
> > fine with delaying the asynchronous tech fetching to future works and
> keep
> > the current KIP's scope as-is for now. Under that case can we consider
> just
> > a minor implementation detail (since it is not affecting public APIs we
> > probably do not even need to list it, but just thinking loud here):
> >
> > In your proposal when we request for a topic of unknown metadata, we are
> > going to directly set the topic name as that singleton in the request.
> I'm
> > wondering for the scenario that KAFKA-8904 described, if the
> producer#send
> > for thousands of new topics are triggered sequentially by a single thread
> > or concurrent threads? If it's the latter, and we expect in such
> scenarios
> > we may have multiple topics being requests within a very short time, then
> > we can probably do sth. like this internally in a synchronized manner:
> >
> > 1) put the topic name into a list, as "unknown topics", then
> > 2) exhaust the list, and put all topics from that list to the request; if
> > the list is empty, it means it has been emptied by another thread so we
> > skip sending a new request and just wait for the returned metadata
> refresh.
> >
> > In most cases the list would just be a singleton with the one that thread
> > has just enqueued, but under extreme scenarios it can help batching a few
> > topic names probably (of course, I'm thinking about very extreme cases
> > here, assuming that's was what we've seen in 8904). Since these two steps
> > are very light-weighted, doing that in a synchronized block would not
> hurt
> > the concurrency too much.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jan 21, 2020 at 9:39 AM Brian Byrne  wrote:
> >
> > > Hi Guozhang,
> > >
> > > Your understanding of the rationale is accurate, and what you suggest
> is
> > > completely plausible, however I have a slightly different take on the
> > > situation.
> > >
> > > When the KIP was originally drafted, making KafkaProducer#send
> > asynchronous
> > > was one element of the changes (this is a little more general than (a),
> > but
> > > has similar implications). As you're aware, doing so would allow new
> > topics
> > > to aggregate since the producer could continue to push new records,
> > whereas
> > > today the producer thread is blocked waiting for resolution.
> > >
> > > However, there were concerns about changing client behavior
> unexpectedly
> > in
> > > this manner, and the change isn't as trivial as one would hope. For
> > > example, we'd have to introduce an intermediate queue of records for
> > topics
> > > without metadata, and have that play well with the buffer pool which
> > > ensures the memory limit isn't exceeded. A side effect is that a
> producer
> > > could hit 'memory full' conditions easier, which could have unintended
> > > consequences if, say, the model was setup such that different producer
> > > threads produced to a disjoint set of topics. Where one producer thread
> > was
> > > blocked waiting for new metadata, it could now push enough data to
> block
> > > all producer threads due to memory limits, so we'd need to be careful
> > here.
> > >
> > > For case (a) described, another concern would be adding additional a
> new
> > > source of latency (possibly seconds) for new topics. Not a huge issue,
> > but
> > > it is new behavior to existing clients and adds to the complexity of
> > > verifying no major regressions.
> > >
> > > It also wouldn't resolve all cases we're interested in. One behavior
> > we're
> > > witnessing is the following: a producer generates to a very large
> number
> > of
> > > topics (several thousand), however the period 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-21 Thread Brian Byrne
Hi Guozhang,

Ah, sorry, I misunderstood. Actually, this is solved for us today. How the
producer works is that it maintains at most one inflight metadata fetch
request at any time, where each request is tagged with the current
(monotonically increasing) request version. This version is bumped whenever
a new topic is encountered, and metadata fetching will continue to process
while the latest metadata response's version is below the current version.

So if a metadata request is in flight, and a number of threads produce to
new topics, they'll be added to the working set but the next metadata
request won't take place until the outstanding one returns. So their
updates will be batched together. As you suggest, we can have a simple list
that tracks unknown topics to isolate new vs. old topics.

Thanks,
Brian



On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang  wrote:

> Hi Brian,
>
> I think I buy the complexity and extra end-to-end-latency argument :) I'm
> fine with delaying the asynchronous tech fetching to future works and keep
> the current KIP's scope as-is for now. Under that case can we consider just
> a minor implementation detail (since it is not affecting public APIs we
> probably do not even need to list it, but just thinking loud here):
>
> In your proposal when we request for a topic of unknown metadata, we are
> going to directly set the topic name as that singleton in the request. I'm
> wondering for the scenario that KAFKA-8904 described, if the producer#send
> for thousands of new topics are triggered sequentially by a single thread
> or concurrent threads? If it's the latter, and we expect in such scenarios
> we may have multiple topics being requests within a very short time, then
> we can probably do sth. like this internally in a synchronized manner:
>
> 1) put the topic name into a list, as "unknown topics", then
> 2) exhaust the list, and put all topics from that list to the request; if
> the list is empty, it means it has been emptied by another thread so we
> skip sending a new request and just wait for the returned metadata refresh.
>
> In most cases the list would just be a singleton with the one that thread
> has just enqueued, but under extreme scenarios it can help batching a few
> topic names probably (of course, I'm thinking about very extreme cases
> here, assuming that's was what we've seen in 8904). Since these two steps
> are very light-weighted, doing that in a synchronized block would not hurt
> the concurrency too much.
>
>
> Guozhang
>
>
> On Tue, Jan 21, 2020 at 9:39 AM Brian Byrne  wrote:
>
> > Hi Guozhang,
> >
> > Your understanding of the rationale is accurate, and what you suggest is
> > completely plausible, however I have a slightly different take on the
> > situation.
> >
> > When the KIP was originally drafted, making KafkaProducer#send
> asynchronous
> > was one element of the changes (this is a little more general than (a),
> but
> > has similar implications). As you're aware, doing so would allow new
> topics
> > to aggregate since the producer could continue to push new records,
> whereas
> > today the producer thread is blocked waiting for resolution.
> >
> > However, there were concerns about changing client behavior unexpectedly
> in
> > this manner, and the change isn't as trivial as one would hope. For
> > example, we'd have to introduce an intermediate queue of records for
> topics
> > without metadata, and have that play well with the buffer pool which
> > ensures the memory limit isn't exceeded. A side effect is that a producer
> > could hit 'memory full' conditions easier, which could have unintended
> > consequences if, say, the model was setup such that different producer
> > threads produced to a disjoint set of topics. Where one producer thread
> was
> > blocked waiting for new metadata, it could now push enough data to block
> > all producer threads due to memory limits, so we'd need to be careful
> here.
> >
> > For case (a) described, another concern would be adding additional a new
> > source of latency (possibly seconds) for new topics. Not a huge issue,
> but
> > it is new behavior to existing clients and adds to the complexity of
> > verifying no major regressions.
> >
> > It also wouldn't resolve all cases we're interested in. One behavior
> we're
> > witnessing is the following: a producer generates to a very large number
> of
> > topics (several thousand), however the period of consecutive records for
> a
> > topic can often be beyond the current hard-coded expiry of 5 minutes.
> > Therefore, when the producer does submit a request for this topic after 5
> > minutes, it has to request all of the topic metadata again. While
> batching
> > new topics in the start-up case would definitely help, here it would
> likely
> > be lost effort.
> >
> > Being able to increase the metadata eviction for the above case would
> > improve things, but there's no perfect value when consecutive produce
> times
> > can be modelled as a probability 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-21 Thread Guozhang Wang
Hi Brian,

I think I buy the complexity and extra end-to-end-latency argument :) I'm
fine with delaying the asynchronous tech fetching to future works and keep
the current KIP's scope as-is for now. Under that case can we consider just
a minor implementation detail (since it is not affecting public APIs we
probably do not even need to list it, but just thinking loud here):

In your proposal when we request for a topic of unknown metadata, we are
going to directly set the topic name as that singleton in the request. I'm
wondering for the scenario that KAFKA-8904 described, if the producer#send
for thousands of new topics are triggered sequentially by a single thread
or concurrent threads? If it's the latter, and we expect in such scenarios
we may have multiple topics being requests within a very short time, then
we can probably do sth. like this internally in a synchronized manner:

1) put the topic name into a list, as "unknown topics", then
2) exhaust the list, and put all topics from that list to the request; if
the list is empty, it means it has been emptied by another thread so we
skip sending a new request and just wait for the returned metadata refresh.

In most cases the list would just be a singleton with the one that thread
has just enqueued, but under extreme scenarios it can help batching a few
topic names probably (of course, I'm thinking about very extreme cases
here, assuming that's was what we've seen in 8904). Since these two steps
are very light-weighted, doing that in a synchronized block would not hurt
the concurrency too much.


Guozhang


On Tue, Jan 21, 2020 at 9:39 AM Brian Byrne  wrote:

> Hi Guozhang,
>
> Your understanding of the rationale is accurate, and what you suggest is
> completely plausible, however I have a slightly different take on the
> situation.
>
> When the KIP was originally drafted, making KafkaProducer#send asynchronous
> was one element of the changes (this is a little more general than (a), but
> has similar implications). As you're aware, doing so would allow new topics
> to aggregate since the producer could continue to push new records, whereas
> today the producer thread is blocked waiting for resolution.
>
> However, there were concerns about changing client behavior unexpectedly in
> this manner, and the change isn't as trivial as one would hope. For
> example, we'd have to introduce an intermediate queue of records for topics
> without metadata, and have that play well with the buffer pool which
> ensures the memory limit isn't exceeded. A side effect is that a producer
> could hit 'memory full' conditions easier, which could have unintended
> consequences if, say, the model was setup such that different producer
> threads produced to a disjoint set of topics. Where one producer thread was
> blocked waiting for new metadata, it could now push enough data to block
> all producer threads due to memory limits, so we'd need to be careful here.
>
> For case (a) described, another concern would be adding additional a new
> source of latency (possibly seconds) for new topics. Not a huge issue, but
> it is new behavior to existing clients and adds to the complexity of
> verifying no major regressions.
>
> It also wouldn't resolve all cases we're interested in. One behavior we're
> witnessing is the following: a producer generates to a very large number of
> topics (several thousand), however the period of consecutive records for a
> topic can often be beyond the current hard-coded expiry of 5 minutes.
> Therefore, when the producer does submit a request for this topic after 5
> minutes, it has to request all of the topic metadata again. While batching
> new topics in the start-up case would definitely help, here it would likely
> be lost effort.
>
> Being able to increase the metadata eviction for the above case would
> improve things, but there's no perfect value when consecutive produce times
> can be modelled as a probability distribution. Rather, the better solution
> could be what we discussed earlier, where we'd lower the eviction timeout,
> but make the cost of fetching uncached topic metadata much smaller.
>
> The changes in the KIP were meant to improve the general case without
> affecting external client behavior, and then the plan was to fix the
> asynchronous send in the next iteration, if necessary. Point (b) is along
> the lines of the latest revision: only send requests for uncached topics,
> if they exist, otherwise request the full working set. Piggy-backing was
> originally included, but removed because its utility was in doubt.
>
> So to summarize, you're correct in that asynchronous topic fetching would
> be a big improvement, and should be an item of future work. However what
> this KIP proposes should be the safest/easiest set of changes to resolve
> the current pain points. Please let me know if you agree/disagree with this
> assessment.
>
> Thanks,
> Brian
>
>
> On Mon, Jan 20, 2020 at 10:52 AM Guozhang Wang  wrote:
>
> > Hello Brian,
> 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-21 Thread Brian Byrne
Hi Guozhang,

Your understanding of the rationale is accurate, and what you suggest is
completely plausible, however I have a slightly different take on the
situation.

When the KIP was originally drafted, making KafkaProducer#send asynchronous
was one element of the changes (this is a little more general than (a), but
has similar implications). As you're aware, doing so would allow new topics
to aggregate since the producer could continue to push new records, whereas
today the producer thread is blocked waiting for resolution.

However, there were concerns about changing client behavior unexpectedly in
this manner, and the change isn't as trivial as one would hope. For
example, we'd have to introduce an intermediate queue of records for topics
without metadata, and have that play well with the buffer pool which
ensures the memory limit isn't exceeded. A side effect is that a producer
could hit 'memory full' conditions easier, which could have unintended
consequences if, say, the model was setup such that different producer
threads produced to a disjoint set of topics. Where one producer thread was
blocked waiting for new metadata, it could now push enough data to block
all producer threads due to memory limits, so we'd need to be careful here.

For case (a) described, another concern would be adding additional a new
source of latency (possibly seconds) for new topics. Not a huge issue, but
it is new behavior to existing clients and adds to the complexity of
verifying no major regressions.

It also wouldn't resolve all cases we're interested in. One behavior we're
witnessing is the following: a producer generates to a very large number of
topics (several thousand), however the period of consecutive records for a
topic can often be beyond the current hard-coded expiry of 5 minutes.
Therefore, when the producer does submit a request for this topic after 5
minutes, it has to request all of the topic metadata again. While batching
new topics in the start-up case would definitely help, here it would likely
be lost effort.

Being able to increase the metadata eviction for the above case would
improve things, but there's no perfect value when consecutive produce times
can be modelled as a probability distribution. Rather, the better solution
could be what we discussed earlier, where we'd lower the eviction timeout,
but make the cost of fetching uncached topic metadata much smaller.

The changes in the KIP were meant to improve the general case without
affecting external client behavior, and then the plan was to fix the
asynchronous send in the next iteration, if necessary. Point (b) is along
the lines of the latest revision: only send requests for uncached topics,
if they exist, otherwise request the full working set. Piggy-backing was
originally included, but removed because its utility was in doubt.

So to summarize, you're correct in that asynchronous topic fetching would
be a big improvement, and should be an item of future work. However what
this KIP proposes should be the safest/easiest set of changes to resolve
the current pain points. Please let me know if you agree/disagree with this
assessment.

Thanks,
Brian


On Mon, Jan 20, 2020 at 10:52 AM Guozhang Wang  wrote:

> Hello Brian,
>
> I looked at the new proposal again, and I'd like to reason about its
> rationale from the listed motivations in your wiki:
>
> 1) more RPCs: we may send metadata requests more frequently than
> appropriate. This is especially the case during producer start-up, where
> the more topics it needs to send to, the more metadata requests it needs to
> send. This the original reported issue as in KAFKA-8904.
>
> 2) large RPCs: we including all topics in the work set when sending
> metadata request. But I think our conjecture (as Colin has pointed out) is
> that this alone is fine most of the time, assuming e.g. you are sending
> such large RPC only once every 10 minutes. It is only because of 1) where
> you are sending large RPC too frequently which is a common issue.
>
> 3) we want to have a configurable eviction period than hard-coded values. I
> consider it as a semi-orthogonal motivation compared with 2) / 3) but we
> wanted to piggy-back this fix along with the KIP.
>
> So from there, 1) and 2) does not contradict to each other since our belief
> is that large RPCs is usually okay as long as it is not large-and-frequent
> RPCs, and we actually prefer large-infrequent RPC > smaller-frequent RPC >
> large-and-frequent RPC (of course).
>
> The current proposal basically tries to un-tangle 2) from 1), i.e. for the
> scenario of KAFKA-8904 it would result in smaller-frequent RPC during
> startup than large-and-frequent RPC. But I'm wondering why don't we just do
> even better and make it large-infrequent RPC? More specifically, just like
> Lucas suggested in the ticket:
>
> a. when there's new topic with unknown metadata enqueued, instead of
> requesting a metadata immediately just delay it for a short period (no more
> than 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-20 Thread Guozhang Wang
Hello Brian,

I looked at the new proposal again, and I'd like to reason about its
rationale from the listed motivations in your wiki:

1) more RPCs: we may send metadata requests more frequently than
appropriate. This is especially the case during producer start-up, where
the more topics it needs to send to, the more metadata requests it needs to
send. This the original reported issue as in KAFKA-8904.

2) large RPCs: we including all topics in the work set when sending
metadata request. But I think our conjecture (as Colin has pointed out) is
that this alone is fine most of the time, assuming e.g. you are sending
such large RPC only once every 10 minutes. It is only because of 1) where
you are sending large RPC too frequently which is a common issue.

3) we want to have a configurable eviction period than hard-coded values. I
consider it as a semi-orthogonal motivation compared with 2) / 3) but we
wanted to piggy-back this fix along with the KIP.

So from there, 1) and 2) does not contradict to each other since our belief
is that large RPCs is usually okay as long as it is not large-and-frequent
RPCs, and we actually prefer large-infrequent RPC > smaller-frequent RPC >
large-and-frequent RPC (of course).

The current proposal basically tries to un-tangle 2) from 1), i.e. for the
scenario of KAFKA-8904 it would result in smaller-frequent RPC during
startup than large-and-frequent RPC. But I'm wondering why don't we just do
even better and make it large-infrequent RPC? More specifically, just like
Lucas suggested in the ticket:

a. when there's new topic with unknown metadata enqueued, instead of
requesting a metadata immediately just delay it for a short period (no more
than seconds) hoping that more unknown topics would be requested in the
period; during this period we would not know which partition it would go to
of course, so we buffer it in a different manner.

b. when we are about to send metadata, if there are unknown topic(s) --
consider them "urgent topics" -- just send them without other topics;
otherwise, send the work set in the request. If we want to go even fancier,
we can still piggy-back some non-urgent along with urgent ones but it is
more complicated to reason about the trade-off so a simpler approach is
fine too.

c. fixing 3) with a new config, which is relatively orthogonal to a) and b).



Guozhang




On Tue, Jan 14, 2020 at 10:39 AM Brian Byrne  wrote:

> Hello all,
>
> After further offline discussion, I've removed any efforts to control
> metadata RPC sizes. There are now only two items proposed in this KIP:
>
> (1) When encountering a new topic, only issue a metadata request for that
> particular topic. For all other cases, continue as it does today with a
> full working set refresh.
>
> (2) Introduces client configuration flag "metadata.eviction.period.ms" to
> control cache eviction duration. I've reset the default back to the current
> (hard-coded) value of 5 minutes since we can identify cases where changing
> it would cause surprises.
>
> The votes have been cleared. My apologies for continually interrupting and
> making changes to the KIP, but hopefully this is an agreeable minimum
> solution to move forward.
>
> Thanks,
> Brian
>
> On Mon, Jan 6, 2020 at 5:23 PM Colin McCabe  wrote:
>
> > On Mon, Jan 6, 2020, at 14:40, Brian Byrne wrote:
> > > So the performance of a metadata RPC that occurs every once every 10
> > > seconds should not be measured strictly in CPU cost, but rather the
> > effect
> > > on the 95-99%. The larger the request is, the more opportunity there is
> > to
> > > put a burst stress on the producer and broker, and the larger the
> > response
> > > payload to push through the control plane socket. Maybe that's not at
> 5k
> > > topics, but there are groups that are 10k+ topics and pushing further.
> >
> > KAFKA-7019 made reading the metadata lock-free.  There is no a priori
> > reason to prefer lots of small requests to a few big requests (within
> > reason!)  In fact, it's quite the opposite: when we make lots of small
> > requests, it uses more network bandwidth than when we make a few big
> ones.
> > There are a few reasons for this: the request and response headers have a
> > fixed overhead, one big array takes less space when serialized than
> several
> > small ones, etc.  There is also TCP and IP overhead, etc.
> >
> > The broker can only push a few tens of thousands of metadata requests a
> > second, due to the overhead of message processing.  This is why almost
> all
> > of the admin commands support batching.  So if you need to create 1,000
> > topics, you make one request, not 1,000 requests, for example.
> >
> > It's definitely reasonable to limit the number of topics made per
> metadata
> > request.  But the reason for this is not improving performance, but
> > preventing certain bad corner cases that happen when RPCs get too big.
> For
> > example, one problem that can happen when a metadata response gets too
> big
> > is that the client 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-14 Thread Brian Byrne
Hello all,

After further offline discussion, I've removed any efforts to control
metadata RPC sizes. There are now only two items proposed in this KIP:

(1) When encountering a new topic, only issue a metadata request for that
particular topic. For all other cases, continue as it does today with a
full working set refresh.

(2) Introduces client configuration flag "metadata.eviction.period.ms" to
control cache eviction duration. I've reset the default back to the current
(hard-coded) value of 5 minutes since we can identify cases where changing
it would cause surprises.

The votes have been cleared. My apologies for continually interrupting and
making changes to the KIP, but hopefully this is an agreeable minimum
solution to move forward.

Thanks,
Brian

On Mon, Jan 6, 2020 at 5:23 PM Colin McCabe  wrote:

> On Mon, Jan 6, 2020, at 14:40, Brian Byrne wrote:
> > So the performance of a metadata RPC that occurs every once every 10
> > seconds should not be measured strictly in CPU cost, but rather the
> effect
> > on the 95-99%. The larger the request is, the more opportunity there is
> to
> > put a burst stress on the producer and broker, and the larger the
> response
> > payload to push through the control plane socket. Maybe that's not at 5k
> > topics, but there are groups that are 10k+ topics and pushing further.
>
> KAFKA-7019 made reading the metadata lock-free.  There is no a priori
> reason to prefer lots of small requests to a few big requests (within
> reason!)  In fact, it's quite the opposite: when we make lots of small
> requests, it uses more network bandwidth than when we make a few big ones.
> There are a few reasons for this: the request and response headers have a
> fixed overhead, one big array takes less space when serialized than several
> small ones, etc.  There is also TCP and IP overhead, etc.
>
> The broker can only push a few tens of thousands of metadata requests a
> second, due to the overhead of message processing.  This is why almost all
> of the admin commands support batching.  So if you need to create 1,000
> topics, you make one request, not 1,000 requests, for example.
>
> It's definitely reasonable to limit the number of topics made per metadata
> request.  But the reason for this is not improving performance, but
> preventing certain bad corner cases that happen when RPCs get too big.  For
> example, one problem that can happen when a metadata response gets too big
> is that the client could time out before it finishes reading the response.
> Or if the response got way too big, it could even exceed the maximum
> response size.
>
> So I think the limit should be pretty high here.  We might also consider
> putting the limit in terms of number of partitions rather than number of
> topics, since that's what really matters here (this is harder to implement,
> I realize...)  If I had to put a rough number on it, I'd say we don't want
> more than like 50 MB of response data.  This is vaguely in line with how we
> do fetch responses as well (although I think the limit there is higher).
>
> We should also keep in mind that anyone with a wildcard subscription is
> making full metadata requests, which will return back information about
> every topic in the system.
>
> >
> > There's definitely weight to the metadata RPCs. Looking at a previous
> > local, non-loaded test I ran, I calculate about 2 microseconds per
> > partition latency to the producer. At 10,000 topics with 100 partitions
> > each, that's a full 2-second bubble in the best case. I can rerun a more
> > targeted performance test, but I feel that's missing the point.
> >
>
> If the metadata is fetched in the background, there should be no impact on
> producer latency, right?
>
> It would be good to talk more about the importance of background metadata
> fetching in the KIP.  The fact that we don't do this is actually a big
> problem with the current implementation.  As I understand it, when the
> metadata gets too old, we slam on the brakes and wait for a metadata fetch
> to complete, rather than starting the metadata fetch BEFORE we need it.
> It's just bad scheduling.
>
> best,
> Colin
>
> >
> > Brian
> >
> > On Mon, Jan 6, 2020 at 1:31 PM Colin McCabe  wrote:
> >
> > > On Mon, Jan 6, 2020, at 13:07, Brian Byrne wrote:
> > > > Hi Colin,
> > > >
> > > > Thanks again for the feedback!
> > > >
> > > > On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe 
> wrote:
> > > >
> > > > > Metadata requests don't (always) go to the controller, right?  We
> > > should
> > > > > fix the wording in this section.
> > > > >
> > > >
> > > > You're correct, s/controller/broker(s)/.
> > > >
> > > > I feel like "Proposed Changes" should come before "Public Interfaces"
> > > > > here.  The new configuration won't make sense to the reader until
> he
> > > or she
> > > > > has read the "changes" section.  Also, it's not clear from the name
> > > that
> > > > > "metadata evict" refers to a span of time.  What do you think
> about "
> > > > > 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Colin McCabe
On Mon, Jan 6, 2020, at 14:40, Brian Byrne wrote:
> So the performance of a metadata RPC that occurs every once every 10
> seconds should not be measured strictly in CPU cost, but rather the effect
> on the 95-99%. The larger the request is, the more opportunity there is to
> put a burst stress on the producer and broker, and the larger the response
> payload to push through the control plane socket. Maybe that's not at 5k
> topics, but there are groups that are 10k+ topics and pushing further.

KAFKA-7019 made reading the metadata lock-free.  There is no a priori reason to 
prefer lots of small requests to a few big requests (within reason!)  In fact, 
it's quite the opposite: when we make lots of small requests, it uses more 
network bandwidth than when we make a few big ones.  There are a few reasons 
for this: the request and response headers have a fixed overhead, one big array 
takes less space when serialized than several small ones, etc.  There is also 
TCP and IP overhead, etc.

The broker can only push a few tens of thousands of metadata requests a second, 
due to the overhead of message processing.  This is why almost all of the admin 
commands support batching.  So if you need to create 1,000 topics, you make one 
request, not 1,000 requests, for example.

It's definitely reasonable to limit the number of topics made per metadata 
request.  But the reason for this is not improving performance, but preventing 
certain bad corner cases that happen when RPCs get too big.  For example, one 
problem that can happen when a metadata response gets too big is that the 
client could time out before it finishes reading the response.  Or if the 
response got way too big, it could even exceed the maximum response size.

So I think the limit should be pretty high here.  We might also consider 
putting the limit in terms of number of partitions rather than number of 
topics, since that's what really matters here (this is harder to implement, I 
realize...)  If I had to put a rough number on it, I'd say we don't want more 
than like 50 MB of response data.  This is vaguely in line with how we do fetch 
responses as well (although I think the limit there is higher).

We should also keep in mind that anyone with a wildcard subscription is making 
full metadata requests, which will return back information about every topic in 
the system.

> 
> There's definitely weight to the metadata RPCs. Looking at a previous
> local, non-loaded test I ran, I calculate about 2 microseconds per
> partition latency to the producer. At 10,000 topics with 100 partitions
> each, that's a full 2-second bubble in the best case. I can rerun a more
> targeted performance test, but I feel that's missing the point.
> 

If the metadata is fetched in the background, there should be no impact on 
producer latency, right?

It would be good to talk more about the importance of background metadata 
fetching in the KIP.  The fact that we don't do this is actually a big problem 
with the current implementation.  As I understand it, when the metadata gets 
too old, we slam on the brakes and wait for a metadata fetch to complete, 
rather than starting the metadata fetch BEFORE we need it.  It's just bad 
scheduling.

best,
Colin

> 
> Brian
> 
> On Mon, Jan 6, 2020 at 1:31 PM Colin McCabe  wrote:
> 
> > On Mon, Jan 6, 2020, at 13:07, Brian Byrne wrote:
> > > Hi Colin,
> > >
> > > Thanks again for the feedback!
> > >
> > > On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe  wrote:
> > >
> > > > Metadata requests don't (always) go to the controller, right?  We
> > should
> > > > fix the wording in this section.
> > > >
> > >
> > > You're correct, s/controller/broker(s)/.
> > >
> > > I feel like "Proposed Changes" should come before "Public Interfaces"
> > > > here.  The new configuration won't make sense to the reader until he
> > or she
> > > > has read the "changes" section.  Also, it's not clear from the name
> > that
> > > > "metadata evict" refers to a span of time.  What do you think about "
> > > > metadata.eviction.period.ms" as a configuration name?
> > > >
> > >
> > > Sure, makes sense. Updated order and config name.
> > >
> > >
> > > > Where is the "10 seconds" coming from here?  The current default for
> > > > metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that we want to
> > > > refresh every 5 minutes.  Definitely not every 10 seconds.
> > > >
> > >
> > > The 10 seconds is another arbitrary value to prevent a large number of
> > RPCs
> > > if the target batch size were fixed at 20. For example, if there's 5,000
> > > topics with a 5-minute interval, then instead of issuing an RPC every
> > > 1.2 seconds with batch size of 20, it would issue an RPC every 10 seconds
> > > with batch size of 167.
> > >
> >
> > Hmm.  This will lead to many more RPCs compared to the current situation
> > of issuing an RPC every 5 minutes with 5,000 topics, right?  See below for
> > more discussion.
> >
> > >
> > >
> > > > Stepping back a little bit, it 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Brian Byrne
Hello all,

Does anyone else have opinions on the issue of RPC frequency? Would it be
better to remove the fetching of non-urgent topics altogether, so that the
refreshes are contained in a larger batch?

Thanks,
Brian


On Mon, Jan 6, 2020 at 2:40 PM Brian Byrne  wrote:

>
> So the performance of a metadata RPC that occurs every once every 10
> seconds should not be measured strictly in CPU cost, but rather the effect
> on the 95-99%. The larger the request is, the more opportunity there is to
> put a burst stress on the producer and broker, and the larger the response
> payload to push through the control plane socket. Maybe that's not at 5k
> topics, but there are groups that are 10k+ topics and pushing further.
>
> There's definitely weight to the metadata RPCs. Looking at a previous
> local, non-loaded test I ran, I calculate about 2 microseconds per
> partition latency to the producer. At 10,000 topics with 100 partitions
> each, that's a full 2-second bubble in the best case. I can rerun a more
> targeted performance test, but I feel that's missing the point.
>
> If you're arguing that the batch sizes are too small or time interval too
> short, that's fine, and I'll be happy to increase them. However, there is a
> breaking point to justify refreshing a subset of a producer's topics. Even
> if producers out number brokers by 10x, the fixed overhead cost of an RPC
> that occurs every 1-second is probably not worth scraping for performance,
> but worth sanitizing.
>
> Brian
>
> On Mon, Jan 6, 2020 at 1:31 PM Colin McCabe  wrote:
>
>> On Mon, Jan 6, 2020, at 13:07, Brian Byrne wrote:
>> > Hi Colin,
>> >
>> > Thanks again for the feedback!
>> >
>> > On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe 
>> wrote:
>> >
>> > > Metadata requests don't (always) go to the controller, right?  We
>> should
>> > > fix the wording in this section.
>> > >
>> >
>> > You're correct, s/controller/broker(s)/.
>> >
>> > I feel like "Proposed Changes" should come before "Public Interfaces"
>> > > here.  The new configuration won't make sense to the reader until he
>> or she
>> > > has read the "changes" section.  Also, it's not clear from the name
>> that
>> > > "metadata evict" refers to a span of time.  What do you think about "
>> > > metadata.eviction.period.ms" as a configuration name?
>> > >
>> >
>> > Sure, makes sense. Updated order and config name.
>> >
>> >
>> > > Where is the "10 seconds" coming from here?  The current default for
>> > > metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that we want
>> to
>> > > refresh every 5 minutes.  Definitely not every 10 seconds.
>> > >
>> >
>> > The 10 seconds is another arbitrary value to prevent a large number of
>> RPCs
>> > if the target batch size were fixed at 20. For example, if there's 5,000
>> > topics with a 5-minute interval, then instead of issuing an RPC every
>> > 1.2 seconds with batch size of 20, it would issue an RPC every 10
>> seconds
>> > with batch size of 167.
>> >
>>
>> Hmm.  This will lead to many more RPCs compared to the current situation
>> of issuing an RPC every 5 minutes with 5,000 topics, right?  See below for
>> more discussion.
>>
>> >
>> >
>> > > Stepping back a little bit, it seems like the big problem you
>> identified
>> > > is the O(N^2) behavior of producing to X, then Y, then Z, etc. etc.
>> where
>> > > each new produce to a fresh topic triggers a metadata request with
>> all the
>> > > preceding topics included.
>> > >
>> > > Of course we need to send out a metadata request before producing to
>> X,
>> > > then Y, then Z, but that request could just specify X, or just
>> specify Y,
>> > > etc. etc.  In other words, we could decouple decouple the routine
>> metadata
>> > > fetch which happens on a 5 minute timer from the need to fetch
>> metadata for
>> > > something specific right now.
>> > >
>> > > I guess my question is, do we really need to allow routine metadata
>> > > fetches to "piggyback" on the emergency metadata fetches?  It adds a
>> lot of
>> > > complexity, and we don't have any benchmarks proving that it's better.
>> > > Also, as I understand it, whether we piggyback or not, the number of
>> > > metadata fetches is the same, right?
>> > >
>> >
>> > So it's possible to do as you suggest, but I would argue that it'd be
>> more
>> > complex with how the code is structured and wouldn't add any extra
>> > complexity. The derived metadata class effectively respond to a
>> > notification that a metadata RPC is going to be issued, where they
>> return
>> > the metadata request structure with topics to refresh, which is
>> decoupled
>> > from what generated the event (new topic, stale metadata, periodic
>> refresh
>> > alarm). There is also a strict implementation detail that only one
>> metadata
>> > request can be outstanding at any time, which lends itself to
>> consolidate
>> > complexity in the base metadata class and have the derived use the
>> "listen
>> > for next update" model.
>> >
>> > By maintaining an ordered list 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Brian Byrne
So the performance of a metadata RPC that occurs every once every 10
seconds should not be measured strictly in CPU cost, but rather the effect
on the 95-99%. The larger the request is, the more opportunity there is to
put a burst stress on the producer and broker, and the larger the response
payload to push through the control plane socket. Maybe that's not at 5k
topics, but there are groups that are 10k+ topics and pushing further.

There's definitely weight to the metadata RPCs. Looking at a previous
local, non-loaded test I ran, I calculate about 2 microseconds per
partition latency to the producer. At 10,000 topics with 100 partitions
each, that's a full 2-second bubble in the best case. I can rerun a more
targeted performance test, but I feel that's missing the point.

If you're arguing that the batch sizes are too small or time interval too
short, that's fine, and I'll be happy to increase them. However, there is a
breaking point to justify refreshing a subset of a producer's topics. Even
if producers out number brokers by 10x, the fixed overhead cost of an RPC
that occurs every 1-second is probably not worth scraping for performance,
but worth sanitizing.

Brian

On Mon, Jan 6, 2020 at 1:31 PM Colin McCabe  wrote:

> On Mon, Jan 6, 2020, at 13:07, Brian Byrne wrote:
> > Hi Colin,
> >
> > Thanks again for the feedback!
> >
> > On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe  wrote:
> >
> > > Metadata requests don't (always) go to the controller, right?  We
> should
> > > fix the wording in this section.
> > >
> >
> > You're correct, s/controller/broker(s)/.
> >
> > I feel like "Proposed Changes" should come before "Public Interfaces"
> > > here.  The new configuration won't make sense to the reader until he
> or she
> > > has read the "changes" section.  Also, it's not clear from the name
> that
> > > "metadata evict" refers to a span of time.  What do you think about "
> > > metadata.eviction.period.ms" as a configuration name?
> > >
> >
> > Sure, makes sense. Updated order and config name.
> >
> >
> > > Where is the "10 seconds" coming from here?  The current default for
> > > metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that we want to
> > > refresh every 5 minutes.  Definitely not every 10 seconds.
> > >
> >
> > The 10 seconds is another arbitrary value to prevent a large number of
> RPCs
> > if the target batch size were fixed at 20. For example, if there's 5,000
> > topics with a 5-minute interval, then instead of issuing an RPC every
> > 1.2 seconds with batch size of 20, it would issue an RPC every 10 seconds
> > with batch size of 167.
> >
>
> Hmm.  This will lead to many more RPCs compared to the current situation
> of issuing an RPC every 5 minutes with 5,000 topics, right?  See below for
> more discussion.
>
> >
> >
> > > Stepping back a little bit, it seems like the big problem you
> identified
> > > is the O(N^2) behavior of producing to X, then Y, then Z, etc. etc.
> where
> > > each new produce to a fresh topic triggers a metadata request with all
> the
> > > preceding topics included.
> > >
> > > Of course we need to send out a metadata request before producing to X,
> > > then Y, then Z, but that request could just specify X, or just specify
> Y,
> > > etc. etc.  In other words, we could decouple decouple the routine
> metadata
> > > fetch which happens on a 5 minute timer from the need to fetch
> metadata for
> > > something specific right now.
> > >
> > > I guess my question is, do we really need to allow routine metadata
> > > fetches to "piggyback" on the emergency metadata fetches?  It adds a
> lot of
> > > complexity, and we don't have any benchmarks proving that it's better.
> > > Also, as I understand it, whether we piggyback or not, the number of
> > > metadata fetches is the same, right?
> > >
> >
> > So it's possible to do as you suggest, but I would argue that it'd be
> more
> > complex with how the code is structured and wouldn't add any extra
> > complexity. The derived metadata class effectively respond to a
> > notification that a metadata RPC is going to be issued, where they return
> > the metadata request structure with topics to refresh, which is decoupled
> > from what generated the event (new topic, stale metadata, periodic
> refresh
> > alarm). There is also a strict implementation detail that only one
> metadata
> > request can be outstanding at any time, which lends itself to consolidate
> > complexity in the base metadata class and have the derived use the
> "listen
> > for next update" model.
> >
> > By maintaining an ordered list of topics by their last metadata refresh
> > time (0 for new topics), it's a matter of pulling from the front of the
> > list to see which topics should be included in the next update. Always
> > include all urgent topics, then include non-urgent (i.e. need to be
> > refreshed soon-ish) up to the target batch size.
> >
> > The number of metadata fetches could be reduced. Assuming a target batch
> > size of 20, a new 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Colin McCabe
On Mon, Jan 6, 2020, at 13:07, Brian Byrne wrote:
> Hi Colin,
> 
> Thanks again for the feedback!
> 
> On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe  wrote:
> 
> > Metadata requests don't (always) go to the controller, right?  We should
> > fix the wording in this section.
> >
> 
> You're correct, s/controller/broker(s)/.
> 
> I feel like "Proposed Changes" should come before "Public Interfaces"
> > here.  The new configuration won't make sense to the reader until he or she
> > has read the "changes" section.  Also, it's not clear from the name that
> > "metadata evict" refers to a span of time.  What do you think about "
> > metadata.eviction.period.ms" as a configuration name?
> >
> 
> Sure, makes sense. Updated order and config name.
> 
> 
> > Where is the "10 seconds" coming from here?  The current default for
> > metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that we want to
> > refresh every 5 minutes.  Definitely not every 10 seconds.
> >
> 
> The 10 seconds is another arbitrary value to prevent a large number of RPCs
> if the target batch size were fixed at 20. For example, if there's 5,000
> topics with a 5-minute interval, then instead of issuing an RPC every
> 1.2 seconds with batch size of 20, it would issue an RPC every 10 seconds
> with batch size of 167.
> 

Hmm.  This will lead to many more RPCs compared to the current situation of 
issuing an RPC every 5 minutes with 5,000 topics, right?  See below for more 
discussion.

> 
> 
> > Stepping back a little bit, it seems like the big problem you identified
> > is the O(N^2) behavior of producing to X, then Y, then Z, etc. etc. where
> > each new produce to a fresh topic triggers a metadata request with all the
> > preceding topics included.
> >
> > Of course we need to send out a metadata request before producing to X,
> > then Y, then Z, but that request could just specify X, or just specify Y,
> > etc. etc.  In other words, we could decouple decouple the routine metadata
> > fetch which happens on a 5 minute timer from the need to fetch metadata for
> > something specific right now.
> >
> > I guess my question is, do we really need to allow routine metadata
> > fetches to "piggyback" on the emergency metadata fetches?  It adds a lot of
> > complexity, and we don't have any benchmarks proving that it's better.
> > Also, as I understand it, whether we piggyback or not, the number of
> > metadata fetches is the same, right?
> >
> 
> So it's possible to do as you suggest, but I would argue that it'd be more
> complex with how the code is structured and wouldn't add any extra
> complexity. The derived metadata class effectively respond to a
> notification that a metadata RPC is going to be issued, where they return
> the metadata request structure with topics to refresh, which is decoupled
> from what generated the event (new topic, stale metadata, periodic refresh
> alarm). There is also a strict implementation detail that only one metadata
> request can be outstanding at any time, which lends itself to consolidate
> complexity in the base metadata class and have the derived use the "listen
> for next update" model.
> 
> By maintaining an ordered list of topics by their last metadata refresh
> time (0 for new topics), it's a matter of pulling from the front of the
> list to see which topics should be included in the next update. Always
> include all urgent topics, then include non-urgent (i.e. need to be
> refreshed soon-ish) up to the target batch size.
> 
> The number of metadata fetches could be reduced. Assuming a target batch
> size of 20, a new topic might also refresh 19 other "refresh soon" topics
> in the same RPC, as opposed to those 19 being handled in a subsequent RPC.
> 
> Although to counter the above, the batching/piggybacking logic isn't
> necessarily about reducing the total number of RPCs, but rather to
> distribute the load more evenly over time. For example, if a large number
> of topics need to be refreshed at the approximate same time (common for
> startups cases that hit a large number of topics), the updates are more
> evenly distributed to avoid a flood.

It wouldn't be a flood in the current case, right?  It would just be a single 
metadata request for a lot of topics. 

Let's compare the two cases.  In the current scenario, we have 1 metadata 
request every 5 minutes.  This request is for 5,000 topics (let's say).  In the 
new scenario, we have a request every 10 seconds for 167 topics each.

Which do you think will be more expensive?  I think the second scenario 
certainly will because of the overhead of 30x as many requests send over the 
wire.  Metadata accesses are now lockless, so the big metadata request just 
isn't that much of a problem.  I bet if you benchmark it, sending back metadata 
for 167 topics won't be that much cheaper than sending back metadata for 5k.  
Certainly not 30x cheaper.  There will eventually be a point where we need to 
split metadata requests, but it's definitely not at 5,000 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Brian Byrne
Hi Colin,

Thanks again for the feedback!

On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe  wrote:

> Metadata requests don't (always) go to the controller, right?  We should
> fix the wording in this section.
>

You're correct, s/controller/broker(s)/.

I feel like "Proposed Changes" should come before "Public Interfaces"
> here.  The new configuration won't make sense to the reader until he or she
> has read the "changes" section.  Also, it's not clear from the name that
> "metadata evict" refers to a span of time.  What do you think about "
> metadata.eviction.period.ms" as a configuration name?
>

Sure, makes sense. Updated order and config name.


> Where is the "10 seconds" coming from here?  The current default for
> metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that we want to
> refresh every 5 minutes.  Definitely not every 10 seconds.
>

The 10 seconds is another arbitrary value to prevent a large number of RPCs
if the target batch size were fixed at 20. For example, if there's 5,000
topics with a 5-minute interval, then instead of issuing an RPC every
1.2 seconds with batch size of 20, it would issue an RPC every 10 seconds
with batch size of 167.



> Stepping back a little bit, it seems like the big problem you identified
> is the O(N^2) behavior of producing to X, then Y, then Z, etc. etc. where
> each new produce to a fresh topic triggers a metadata request with all the
> preceding topics included.
>
> Of course we need to send out a metadata request before producing to X,
> then Y, then Z, but that request could just specify X, or just specify Y,
> etc. etc.  In other words, we could decouple decouple the routine metadata
> fetch which happens on a 5 minute timer from the need to fetch metadata for
> something specific right now.
>
> I guess my question is, do we really need to allow routine metadata
> fetches to "piggyback" on the emergency metadata fetches?  It adds a lot of
> complexity, and we don't have any benchmarks proving that it's better.
> Also, as I understand it, whether we piggyback or not, the number of
> metadata fetches is the same, right?
>

So it's possible to do as you suggest, but I would argue that it'd be more
complex with how the code is structured and wouldn't add any extra
complexity. The derived metadata class effectively respond to a
notification that a metadata RPC is going to be issued, where they return
the metadata request structure with topics to refresh, which is decoupled
from what generated the event (new topic, stale metadata, periodic refresh
alarm). There is also a strict implementation detail that only one metadata
request can be outstanding at any time, which lends itself to consolidate
complexity in the base metadata class and have the derived use the "listen
for next update" model.

By maintaining an ordered list of topics by their last metadata refresh
time (0 for new topics), it's a matter of pulling from the front of the
list to see which topics should be included in the next update. Always
include all urgent topics, then include non-urgent (i.e. need to be
refreshed soon-ish) up to the target batch size.

The number of metadata fetches could be reduced. Assuming a target batch
size of 20, a new topic might also refresh 19 other "refresh soon" topics
in the same RPC, as opposed to those 19 being handled in a subsequent RPC.

Although to counter the above, the batching/piggybacking logic isn't
necessarily about reducing the total number of RPCs, but rather to
distribute the load more evenly over time. For example, if a large number
of topics need to be refreshed at the approximate same time (common for
startups cases that hit a large number of topics), the updates are more
evenly distributed to avoid a flood.

Brian



> On Mon, Jan 6, 2020, at 10:26, Lucas Bradstreet wrote:
> > +1 (non-binding)
> >
> > On Thu, Jan 2, 2020 at 11:15 AM Brian Byrne  wrote:
> >
> > > Hello all,
> > >
> > > After further discussion and improvements, I'd like to reinstate the
> voting
> > > process.
> > >
> > > The updated KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526
> > > %3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> >
> > >
> > > The continued discussion:
> > >
> > >
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > >
> > > I'd be happy to address any further comments/feedback.
> > >
> > > Thanks,
> > > Brian
> > >
> > > On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang 
> wrote:
> > >
> > > > With the concluded summary on the other discussion thread, I'm +1 on
> the
> > > > proposal.
> > > >
> > > > Thanks Brian!
> > > >
> > > > On Tue, Nov 19, 2019 at 8:00 PM deng ziming <
> dengziming1...@gmail.com>
> > > > wrote:
> > > >
> > > > > >
> > > > > > For new (uncached) topics, one problem here is that we don't know
> > > 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Colin McCabe
Hi Brian,

Thanks for continuing to work on this.  It looks good overall.

It's probably better to keep the discussion on this thread rather than going 
back and forth between this and the DISCUSS thread.

The KIP states:

 > For (1), an RPC is generated every time an uncached topic's metadata must 
 > be fetched. During periods when a large number of uncached topics are 
 > processed (e.g. producer startup), a large number of RPCs may be sent out 
 > to the controller in a short period of time

Metadata requests don't (always) go to the controller, right?  We should fix 
the wording in this section.

I feel like "Proposed Changes" should come before "Public Interfaces" here.  
The new configuration won't make sense to the reader until he or she has read 
the "changes" section.  Also, it's not clear from the name that "metadata 
evict" refers to a span of time.  What do you think about 
"metadata.eviction.period.ms" as a configuration name?

 > Let metadataRefreshSecs = metadata.max.age.ms / 1000
>
 > Set topicsPerSec =  / metadataRefreshSecs
>
 > Set targetMetadataFetchSize = Math.max(topicsPerSec * 10, 20)
>
 > Rationale: this sets the target size to be approximate a metadata refresh 
 > at least every 10 seconds, while also maintaining a reasonable batch size 
 > of '20' for setups with a lower number of topics. '20' has no significance 
 > other than it's a small-but-appropriate trade-off between RPC metadata 
 > response size and necessary RPC frequency.

Where is the "10 seconds" coming from here?  The current default for 
metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that we want to refresh 
every 5 minutes.  Definitely not every 10 seconds.

Stepping back a little bit, it seems like the big problem you identified is the 
O(N^2) behavior of producing to X, then Y, then Z, etc. etc. where each new 
produce to a fresh topic triggers a metadata request with all the preceding 
topics included.  

Of course we need to send out a metadata request before producing to X, then Y, 
then Z, but that request could just specify X, or just specify Y, etc. etc.  In 
other words, we could decouple decouple the routine metadata fetch which 
happens on a 5 minute timer from the need to fetch metadata for something 
specific right now.

I guess my question is, do we really need to allow routine metadata fetches to 
"piggyback" on the emergency metadata fetches?  It adds a lot of complexity, 
and we don't have any benchmarks proving that it's better.  Also, as I 
understand it, whether we piggyback or not, the number of metadata fetches is 
the same, right?

best,
Colin


On Mon, Jan 6, 2020, at 10:26, Lucas Bradstreet wrote:
> +1 (non-binding)
> 
> On Thu, Jan 2, 2020 at 11:15 AM Brian Byrne  wrote:
> 
> > Hello all,
> >
> > After further discussion and improvements, I'd like to reinstate the voting
> > process.
> >
> > The updated KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-526
> > %3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > 
> >
> > The continued discussion:
> >
> > https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> >
> > I'd be happy to address any further comments/feedback.
> >
> > Thanks,
> > Brian
> >
> > On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang  wrote:
> >
> > > With the concluded summary on the other discussion thread, I'm +1 on the
> > > proposal.
> > >
> > > Thanks Brian!
> > >
> > > On Tue, Nov 19, 2019 at 8:00 PM deng ziming 
> > > wrote:
> > >
> > > > >
> > > > > For new (uncached) topics, one problem here is that we don't know
> > which
> > > > > partition to map a record to in the event that it has a key or custom
> > > > > partitioner, so the RecordAccumulator wouldn't know which
> > batch/broker
> > > it
> > > > > belongs. We'd need an intermediate record queue that subsequently
> > moved
> > > > the
> > > > > records into RecordAccumulators once metadata resolution was
> > complete.
> > > > For
> > > > > known topics, we don't currently block at all in waitOnMetadata.
> > > > >
> > > >
> > > > You are right, I forget this fact, and the intermediate record queue
> > will
> > > > help, but I have some questions
> > > >
> > > > if we add an intermediate record queue in KafkaProducer, when should we
> > > > move the records into RecordAccumulators?
> > > > only NetworkClient is aware of the MetadataResponse, here is the
> > > > hierarchical structure of the related classes:
> > > > KafkaProducer
> > > > Accumulator
> > > > Sender
> > > > NetworkClient
> > > > metadataUpdater.handleCompletedMetadataResponse
> > > >
> > > > so
> > > > 1. we should also add a metadataUpdater to KafkaProducer?
> > > > 2. if the topic really does not exists? the intermediate record queue
> > > will
> > > > become too large?
> > > > 3. and should we 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Lucas Bradstreet
+1 (non-binding)

On Thu, Jan 2, 2020 at 11:15 AM Brian Byrne  wrote:

> Hello all,
>
> After further discussion and improvements, I'd like to reinstate the voting
> process.
>
> The updated KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-526
> %3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> 
>
> The continued discussion:
>
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
>
> I'd be happy to address any further comments/feedback.
>
> Thanks,
> Brian
>
> On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang  wrote:
>
> > With the concluded summary on the other discussion thread, I'm +1 on the
> > proposal.
> >
> > Thanks Brian!
> >
> > On Tue, Nov 19, 2019 at 8:00 PM deng ziming 
> > wrote:
> >
> > > >
> > > > For new (uncached) topics, one problem here is that we don't know
> which
> > > > partition to map a record to in the event that it has a key or custom
> > > > partitioner, so the RecordAccumulator wouldn't know which
> batch/broker
> > it
> > > > belongs. We'd need an intermediate record queue that subsequently
> moved
> > > the
> > > > records into RecordAccumulators once metadata resolution was
> complete.
> > > For
> > > > known topics, we don't currently block at all in waitOnMetadata.
> > > >
> > >
> > > You are right, I forget this fact, and the intermediate record queue
> will
> > > help, but I have some questions
> > >
> > > if we add an intermediate record queue in KafkaProducer, when should we
> > > move the records into RecordAccumulators?
> > > only NetworkClient is aware of the MetadataResponse, here is the
> > > hierarchical structure of the related classes:
> > > KafkaProducer
> > > Accumulator
> > > Sender
> > > NetworkClient
> > > metadataUpdater.handleCompletedMetadataResponse
> > >
> > > so
> > > 1. we should also add a metadataUpdater to KafkaProducer?
> > > 2. if the topic really does not exists? the intermediate record queue
> > will
> > > become too large?
> > > 3. and should we `block` when the intermediate record queue is too
> large?
> > > and this will again bring the blocking problem?
> > >
> > >
> > >
> > > On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne 
> > wrote:
> > >
> > > > Hi Deng,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > On Mon, Nov 18, 2019 at 6:56 PM deng ziming <
> dengziming1...@gmail.com>
> > > > wrote:
> > > >
> > > > > hi, I reviewed the current code, the ProduceMetadata maintains an
> > > expiry
> > > > > threshold for every topic, every time when we write to a topic we
> > will
> > > > set
> > > > > the expiry time to -1 to indicate it should be updated, this does
> > work
> > > to
> > > > > reduce the size of the topic working set, but the producer will
> > > continue
> > > > > fetching metadata for these topics in every metadata request for
> the
> > > full
> > > > > expiry duration.
> > > > >
> > > >
> > > > Indeed, you are correct, I terribly misread the code here.
> Fortunately
> > > this
> > > > was only a minor optimization in the KIP that's no longer necessary.
> > > >
> > > >
> > > > and we can improve the situation by 2 means:
> > > > > 1. we maintain a refresh threshold for every topic which is for
> > > > example
> > > > > 0.8 * expiry_threshold, and when we send `MetadataRequest` to
> brokers
> > > we
> > > > > just request unknownLeaderTopics + unknownPartitionTopics + topics
> > > > > reach refresh threshold.
> > > > >
> > > >
> > > > Right, this is similar to what I suggested, with a larger window on
> the
> > > > "staleness" that permits for batching to an appropriate size (except
> if
> > > > there's any unknown topics, you'd want to issue the request
> > immediately).
> > > >
> > > >
> > > >
> > > > > 2. we don't invoke KafkaProducer#waitOnMetadata when we call
> > > > > KafkaProducer#send because of we just send data to
> RecordAccumulator,
> > > and
> > > > > before we send data to brokers we will invoke
> > > RecordAccumulator#ready(),
> > > > so
> > > > > we can only invoke waitOnMetadata to block when (number topics
> > > > > reach refresh threshold)>(number of all known topics)*0.2.
> > > > >
> > > >
> > > > For new (uncached) topics, one problem here is that we don't know
> which
> > > > partition to map a record to in the event that it has a key or custom
> > > > partitioner, so the RecordAccumulator wouldn't know which
> batch/broker
> > it
> > > > belongs. We'd need an intermediate record queue that subsequently
> moved
> > > the
> > > > records into RecordAccumulators once metadata resolution was
> complete.
> > > For
> > > > known topics, we don't currently block at all in waitOnMetadata.
> > > >
> > > > The last major point of minimizing producer startup metadata RPCs may
> > > still
> > > > need to be improved, but this would be a large improvement on the
> > 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Stanislav Kozlovski
+1 (non-binding)

Thanks for the KIP, Brian!

On Thu, Jan 2, 2020 at 7:15 PM Brian Byrne  wrote:

> Hello all,
>
> After further discussion and improvements, I'd like to reinstate the voting
> process.
>
> The updated KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-526
> %3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> 
>
> The continued discussion:
>
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
>
> I'd be happy to address any further comments/feedback.
>
> Thanks,
> Brian
>
> On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang  wrote:
>
> > With the concluded summary on the other discussion thread, I'm +1 on the
> > proposal.
> >
> > Thanks Brian!
> >
> > On Tue, Nov 19, 2019 at 8:00 PM deng ziming 
> > wrote:
> >
> > > >
> > > > For new (uncached) topics, one problem here is that we don't know
> which
> > > > partition to map a record to in the event that it has a key or custom
> > > > partitioner, so the RecordAccumulator wouldn't know which
> batch/broker
> > it
> > > > belongs. We'd need an intermediate record queue that subsequently
> moved
> > > the
> > > > records into RecordAccumulators once metadata resolution was
> complete.
> > > For
> > > > known topics, we don't currently block at all in waitOnMetadata.
> > > >
> > >
> > > You are right, I forget this fact, and the intermediate record queue
> will
> > > help, but I have some questions
> > >
> > > if we add an intermediate record queue in KafkaProducer, when should we
> > > move the records into RecordAccumulators?
> > > only NetworkClient is aware of the MetadataResponse, here is the
> > > hierarchical structure of the related classes:
> > > KafkaProducer
> > > Accumulator
> > > Sender
> > > NetworkClient
> > > metadataUpdater.handleCompletedMetadataResponse
> > >
> > > so
> > > 1. we should also add a metadataUpdater to KafkaProducer?
> > > 2. if the topic really does not exists? the intermediate record queue
> > will
> > > become too large?
> > > 3. and should we `block` when the intermediate record queue is too
> large?
> > > and this will again bring the blocking problem?
> > >
> > >
> > >
> > > On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne 
> > wrote:
> > >
> > > > Hi Deng,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > On Mon, Nov 18, 2019 at 6:56 PM deng ziming <
> dengziming1...@gmail.com>
> > > > wrote:
> > > >
> > > > > hi, I reviewed the current code, the ProduceMetadata maintains an
> > > expiry
> > > > > threshold for every topic, every time when we write to a topic we
> > will
> > > > set
> > > > > the expiry time to -1 to indicate it should be updated, this does
> > work
> > > to
> > > > > reduce the size of the topic working set, but the producer will
> > > continue
> > > > > fetching metadata for these topics in every metadata request for
> the
> > > full
> > > > > expiry duration.
> > > > >
> > > >
> > > > Indeed, you are correct, I terribly misread the code here.
> Fortunately
> > > this
> > > > was only a minor optimization in the KIP that's no longer necessary.
> > > >
> > > >
> > > > and we can improve the situation by 2 means:
> > > > > 1. we maintain a refresh threshold for every topic which is for
> > > > example
> > > > > 0.8 * expiry_threshold, and when we send `MetadataRequest` to
> brokers
> > > we
> > > > > just request unknownLeaderTopics + unknownPartitionTopics + topics
> > > > > reach refresh threshold.
> > > > >
> > > >
> > > > Right, this is similar to what I suggested, with a larger window on
> the
> > > > "staleness" that permits for batching to an appropriate size (except
> if
> > > > there's any unknown topics, you'd want to issue the request
> > immediately).
> > > >
> > > >
> > > >
> > > > > 2. we don't invoke KafkaProducer#waitOnMetadata when we call
> > > > > KafkaProducer#send because of we just send data to
> RecordAccumulator,
> > > and
> > > > > before we send data to brokers we will invoke
> > > RecordAccumulator#ready(),
> > > > so
> > > > > we can only invoke waitOnMetadata to block when (number topics
> > > > > reach refresh threshold)>(number of all known topics)*0.2.
> > > > >
> > > >
> > > > For new (uncached) topics, one problem here is that we don't know
> which
> > > > partition to map a record to in the event that it has a key or custom
> > > > partitioner, so the RecordAccumulator wouldn't know which
> batch/broker
> > it
> > > > belongs. We'd need an intermediate record queue that subsequently
> moved
> > > the
> > > > records into RecordAccumulators once metadata resolution was
> complete.
> > > For
> > > > known topics, we don't currently block at all in waitOnMetadata.
> > > >
> > > > The last major point of minimizing producer startup metadata RPCs may
> > > still
> > > > need to be improved, but this would be a 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-02 Thread Brian Byrne
Hello all,

After further discussion and improvements, I'd like to reinstate the voting
process.

The updated KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-526
%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics

The continued discussion:
https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E

I'd be happy to address any further comments/feedback.

Thanks,
Brian

On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang  wrote:

> With the concluded summary on the other discussion thread, I'm +1 on the
> proposal.
>
> Thanks Brian!
>
> On Tue, Nov 19, 2019 at 8:00 PM deng ziming 
> wrote:
>
> > >
> > > For new (uncached) topics, one problem here is that we don't know which
> > > partition to map a record to in the event that it has a key or custom
> > > partitioner, so the RecordAccumulator wouldn't know which batch/broker
> it
> > > belongs. We'd need an intermediate record queue that subsequently moved
> > the
> > > records into RecordAccumulators once metadata resolution was complete.
> > For
> > > known topics, we don't currently block at all in waitOnMetadata.
> > >
> >
> > You are right, I forget this fact, and the intermediate record queue will
> > help, but I have some questions
> >
> > if we add an intermediate record queue in KafkaProducer, when should we
> > move the records into RecordAccumulators?
> > only NetworkClient is aware of the MetadataResponse, here is the
> > hierarchical structure of the related classes:
> > KafkaProducer
> > Accumulator
> > Sender
> > NetworkClient
> > metadataUpdater.handleCompletedMetadataResponse
> >
> > so
> > 1. we should also add a metadataUpdater to KafkaProducer?
> > 2. if the topic really does not exists? the intermediate record queue
> will
> > become too large?
> > 3. and should we `block` when the intermediate record queue is too large?
> > and this will again bring the blocking problem?
> >
> >
> >
> > On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne 
> wrote:
> >
> > > Hi Deng,
> > >
> > > Thanks for the feedback.
> > >
> > > On Mon, Nov 18, 2019 at 6:56 PM deng ziming 
> > > wrote:
> > >
> > > > hi, I reviewed the current code, the ProduceMetadata maintains an
> > expiry
> > > > threshold for every topic, every time when we write to a topic we
> will
> > > set
> > > > the expiry time to -1 to indicate it should be updated, this does
> work
> > to
> > > > reduce the size of the topic working set, but the producer will
> > continue
> > > > fetching metadata for these topics in every metadata request for the
> > full
> > > > expiry duration.
> > > >
> > >
> > > Indeed, you are correct, I terribly misread the code here. Fortunately
> > this
> > > was only a minor optimization in the KIP that's no longer necessary.
> > >
> > >
> > > and we can improve the situation by 2 means:
> > > > 1. we maintain a refresh threshold for every topic which is for
> > > example
> > > > 0.8 * expiry_threshold, and when we send `MetadataRequest` to brokers
> > we
> > > > just request unknownLeaderTopics + unknownPartitionTopics + topics
> > > > reach refresh threshold.
> > > >
> > >
> > > Right, this is similar to what I suggested, with a larger window on the
> > > "staleness" that permits for batching to an appropriate size (except if
> > > there's any unknown topics, you'd want to issue the request
> immediately).
> > >
> > >
> > >
> > > > 2. we don't invoke KafkaProducer#waitOnMetadata when we call
> > > > KafkaProducer#send because of we just send data to RecordAccumulator,
> > and
> > > > before we send data to brokers we will invoke
> > RecordAccumulator#ready(),
> > > so
> > > > we can only invoke waitOnMetadata to block when (number topics
> > > > reach refresh threshold)>(number of all known topics)*0.2.
> > > >
> > >
> > > For new (uncached) topics, one problem here is that we don't know which
> > > partition to map a record to in the event that it has a key or custom
> > > partitioner, so the RecordAccumulator wouldn't know which batch/broker
> it
> > > belongs. We'd need an intermediate record queue that subsequently moved
> > the
> > > records into RecordAccumulators once metadata resolution was complete.
> > For
> > > known topics, we don't currently block at all in waitOnMetadata.
> > >
> > > The last major point of minimizing producer startup metadata RPCs may
> > still
> > > need to be improved, but this would be a large improvement on the
> current
> > > situation.
> > >
> > > Thanks,
> > > Brian
> > >
> > >
> > >
> > > > I think the above 2 ways are enough to solve the current problem.
> > > >
> > > > On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe 
> > wrote:
> > > >
> > > > > On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
> > > > > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe  >
> > > > wrote:
> > > > > >
> > > > > > > Two seconds doesn't seem like a reasonable amount of time to
> > leave
> > > > for
> > > > > the
> > > > > > > metadata 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-12-09 Thread Guozhang Wang
With the concluded summary on the other discussion thread, I'm +1 on the
proposal.

Thanks Brian!

On Tue, Nov 19, 2019 at 8:00 PM deng ziming 
wrote:

> >
> > For new (uncached) topics, one problem here is that we don't know which
> > partition to map a record to in the event that it has a key or custom
> > partitioner, so the RecordAccumulator wouldn't know which batch/broker it
> > belongs. We'd need an intermediate record queue that subsequently moved
> the
> > records into RecordAccumulators once metadata resolution was complete.
> For
> > known topics, we don't currently block at all in waitOnMetadata.
> >
>
> You are right, I forget this fact, and the intermediate record queue will
> help, but I have some questions
>
> if we add an intermediate record queue in KafkaProducer, when should we
> move the records into RecordAccumulators?
> only NetworkClient is aware of the MetadataResponse, here is the
> hierarchical structure of the related classes:
> KafkaProducer
> Accumulator
> Sender
> NetworkClient
> metadataUpdater.handleCompletedMetadataResponse
>
> so
> 1. we should also add a metadataUpdater to KafkaProducer?
> 2. if the topic really does not exists? the intermediate record queue will
> become too large?
> 3. and should we `block` when the intermediate record queue is too large?
> and this will again bring the blocking problem?
>
>
>
> On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne  wrote:
>
> > Hi Deng,
> >
> > Thanks for the feedback.
> >
> > On Mon, Nov 18, 2019 at 6:56 PM deng ziming 
> > wrote:
> >
> > > hi, I reviewed the current code, the ProduceMetadata maintains an
> expiry
> > > threshold for every topic, every time when we write to a topic we will
> > set
> > > the expiry time to -1 to indicate it should be updated, this does work
> to
> > > reduce the size of the topic working set, but the producer will
> continue
> > > fetching metadata for these topics in every metadata request for the
> full
> > > expiry duration.
> > >
> >
> > Indeed, you are correct, I terribly misread the code here. Fortunately
> this
> > was only a minor optimization in the KIP that's no longer necessary.
> >
> >
> > and we can improve the situation by 2 means:
> > > 1. we maintain a refresh threshold for every topic which is for
> > example
> > > 0.8 * expiry_threshold, and when we send `MetadataRequest` to brokers
> we
> > > just request unknownLeaderTopics + unknownPartitionTopics + topics
> > > reach refresh threshold.
> > >
> >
> > Right, this is similar to what I suggested, with a larger window on the
> > "staleness" that permits for batching to an appropriate size (except if
> > there's any unknown topics, you'd want to issue the request immediately).
> >
> >
> >
> > > 2. we don't invoke KafkaProducer#waitOnMetadata when we call
> > > KafkaProducer#send because of we just send data to RecordAccumulator,
> and
> > > before we send data to brokers we will invoke
> RecordAccumulator#ready(),
> > so
> > > we can only invoke waitOnMetadata to block when (number topics
> > > reach refresh threshold)>(number of all known topics)*0.2.
> > >
> >
> > For new (uncached) topics, one problem here is that we don't know which
> > partition to map a record to in the event that it has a key or custom
> > partitioner, so the RecordAccumulator wouldn't know which batch/broker it
> > belongs. We'd need an intermediate record queue that subsequently moved
> the
> > records into RecordAccumulators once metadata resolution was complete.
> For
> > known topics, we don't currently block at all in waitOnMetadata.
> >
> > The last major point of minimizing producer startup metadata RPCs may
> still
> > need to be improved, but this would be a large improvement on the current
> > situation.
> >
> > Thanks,
> > Brian
> >
> >
> >
> > > I think the above 2 ways are enough to solve the current problem.
> > >
> > > On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe 
> wrote:
> > >
> > > > On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
> > > > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe 
> > > wrote:
> > > > >
> > > > > > Two seconds doesn't seem like a reasonable amount of time to
> leave
> > > for
> > > > the
> > > > > > metadata fetch.  Fetching halfway through the expiration period
> > seems
> > > > more
> > > > > > reasonable.  It also doesn't require us to create a new
> > configuration
> > > > key,
> > > > > > which is nice.
> > > > > >
> > > > > > Another option is to just do the metadata fetch every
> > > > metadata.max.age.ms,
> > > > > > but not expire the topic until we can't fetch the metadata for 2
> *
> > > > > > metadata.max.age.ms.
> > > > > >
> > > > >
> > > > > I'd expect two seconds to be reasonable in the common case. Keep in
> > > mind
> > > > > that this doesn't affect correctness, and a control operation
> > returning
> > > > > cached metadata should be on the order of milliseconds.
> > > > >
> > > >
> > > > Hi Brian,
> > > >
> > > > Thanks again for the KIP.
> > > >
> > > > 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-19 Thread deng ziming
>
> For new (uncached) topics, one problem here is that we don't know which
> partition to map a record to in the event that it has a key or custom
> partitioner, so the RecordAccumulator wouldn't know which batch/broker it
> belongs. We'd need an intermediate record queue that subsequently moved the
> records into RecordAccumulators once metadata resolution was complete. For
> known topics, we don't currently block at all in waitOnMetadata.
>

You are right, I forget this fact, and the intermediate record queue will
help, but I have some questions

if we add an intermediate record queue in KafkaProducer, when should we
move the records into RecordAccumulators?
only NetworkClient is aware of the MetadataResponse, here is the
hierarchical structure of the related classes:
KafkaProducer
Accumulator
Sender
NetworkClient
metadataUpdater.handleCompletedMetadataResponse

so
1. we should also add a metadataUpdater to KafkaProducer?
2. if the topic really does not exists? the intermediate record queue will
become too large?
3. and should we `block` when the intermediate record queue is too large?
and this will again bring the blocking problem?



On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne  wrote:

> Hi Deng,
>
> Thanks for the feedback.
>
> On Mon, Nov 18, 2019 at 6:56 PM deng ziming 
> wrote:
>
> > hi, I reviewed the current code, the ProduceMetadata maintains an expiry
> > threshold for every topic, every time when we write to a topic we will
> set
> > the expiry time to -1 to indicate it should be updated, this does work to
> > reduce the size of the topic working set, but the producer will continue
> > fetching metadata for these topics in every metadata request for the full
> > expiry duration.
> >
>
> Indeed, you are correct, I terribly misread the code here. Fortunately this
> was only a minor optimization in the KIP that's no longer necessary.
>
>
> and we can improve the situation by 2 means:
> > 1. we maintain a refresh threshold for every topic which is for
> example
> > 0.8 * expiry_threshold, and when we send `MetadataRequest` to brokers we
> > just request unknownLeaderTopics + unknownPartitionTopics + topics
> > reach refresh threshold.
> >
>
> Right, this is similar to what I suggested, with a larger window on the
> "staleness" that permits for batching to an appropriate size (except if
> there's any unknown topics, you'd want to issue the request immediately).
>
>
>
> > 2. we don't invoke KafkaProducer#waitOnMetadata when we call
> > KafkaProducer#send because of we just send data to RecordAccumulator, and
> > before we send data to brokers we will invoke RecordAccumulator#ready(),
> so
> > we can only invoke waitOnMetadata to block when (number topics
> > reach refresh threshold)>(number of all known topics)*0.2.
> >
>
> For new (uncached) topics, one problem here is that we don't know which
> partition to map a record to in the event that it has a key or custom
> partitioner, so the RecordAccumulator wouldn't know which batch/broker it
> belongs. We'd need an intermediate record queue that subsequently moved the
> records into RecordAccumulators once metadata resolution was complete. For
> known topics, we don't currently block at all in waitOnMetadata.
>
> The last major point of minimizing producer startup metadata RPCs may still
> need to be improved, but this would be a large improvement on the current
> situation.
>
> Thanks,
> Brian
>
>
>
> > I think the above 2 ways are enough to solve the current problem.
> >
> > On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe  wrote:
> >
> > > On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
> > > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe 
> > wrote:
> > > >
> > > > > Two seconds doesn't seem like a reasonable amount of time to leave
> > for
> > > the
> > > > > metadata fetch.  Fetching halfway through the expiration period
> seems
> > > more
> > > > > reasonable.  It also doesn't require us to create a new
> configuration
> > > key,
> > > > > which is nice.
> > > > >
> > > > > Another option is to just do the metadata fetch every
> > > metadata.max.age.ms,
> > > > > but not expire the topic until we can't fetch the metadata for 2 *
> > > > > metadata.max.age.ms.
> > > > >
> > > >
> > > > I'd expect two seconds to be reasonable in the common case. Keep in
> > mind
> > > > that this doesn't affect correctness, and a control operation
> returning
> > > > cached metadata should be on the order of milliseconds.
> > > >
> > >
> > > Hi Brian,
> > >
> > > Thanks again for the KIP.
> > >
> > > I think the issue here is not the common case, but the uncommon case
> > where
> > > the metadata fetch takes longer than expected.  In that case, we don't
> > want
> > > to be in the position of having our metadata expire because we waited
> too
> > > long to renew it.
> > >
> > > This is one reason why I think that the metadata expiration time should
> > be
> > > longer than the metadata refresh time.  In fact, it might be 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-19 Thread Brian Byrne
Hi Deng,

Thanks for the feedback.

On Mon, Nov 18, 2019 at 6:56 PM deng ziming 
wrote:

> hi, I reviewed the current code, the ProduceMetadata maintains an expiry
> threshold for every topic, every time when we write to a topic we will set
> the expiry time to -1 to indicate it should be updated, this does work to
> reduce the size of the topic working set, but the producer will continue
> fetching metadata for these topics in every metadata request for the full
> expiry duration.
>

Indeed, you are correct, I terribly misread the code here. Fortunately this
was only a minor optimization in the KIP that's no longer necessary.


and we can improve the situation by 2 means:
> 1. we maintain a refresh threshold for every topic which is for example
> 0.8 * expiry_threshold, and when we send `MetadataRequest` to brokers we
> just request unknownLeaderTopics + unknownPartitionTopics + topics
> reach refresh threshold.
>

Right, this is similar to what I suggested, with a larger window on the
"staleness" that permits for batching to an appropriate size (except if
there's any unknown topics, you'd want to issue the request immediately).



> 2. we don't invoke KafkaProducer#waitOnMetadata when we call
> KafkaProducer#send because of we just send data to RecordAccumulator, and
> before we send data to brokers we will invoke RecordAccumulator#ready(), so
> we can only invoke waitOnMetadata to block when (number topics
> reach refresh threshold)>(number of all known topics)*0.2.
>

For new (uncached) topics, one problem here is that we don't know which
partition to map a record to in the event that it has a key or custom
partitioner, so the RecordAccumulator wouldn't know which batch/broker it
belongs. We'd need an intermediate record queue that subsequently moved the
records into RecordAccumulators once metadata resolution was complete. For
known topics, we don't currently block at all in waitOnMetadata.

The last major point of minimizing producer startup metadata RPCs may still
need to be improved, but this would be a large improvement on the current
situation.

Thanks,
Brian



> I think the above 2 ways are enough to solve the current problem.
>
> On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe  wrote:
>
> > On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
> > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe 
> wrote:
> > >
> > > > Two seconds doesn't seem like a reasonable amount of time to leave
> for
> > the
> > > > metadata fetch.  Fetching halfway through the expiration period seems
> > more
> > > > reasonable.  It also doesn't require us to create a new configuration
> > key,
> > > > which is nice.
> > > >
> > > > Another option is to just do the metadata fetch every
> > metadata.max.age.ms,
> > > > but not expire the topic until we can't fetch the metadata for 2 *
> > > > metadata.max.age.ms.
> > > >
> > >
> > > I'd expect two seconds to be reasonable in the common case. Keep in
> mind
> > > that this doesn't affect correctness, and a control operation returning
> > > cached metadata should be on the order of milliseconds.
> > >
> >
> > Hi Brian,
> >
> > Thanks again for the KIP.
> >
> > I think the issue here is not the common case, but the uncommon case
> where
> > the metadata fetch takes longer than expected.  In that case, we don't
> want
> > to be in the position of having our metadata expire because we waited too
> > long to renew it.
> >
> > This is one reason why I think that the metadata expiration time should
> be
> > longer than the metadata refresh time.  In fact, it might be worth having
> > two separate configuration keys for these two values.  I could imagine a
> > user who is having trouble with metadata expiration wanting to increase
> the
> > metadata expiration time, but without increasing the metadata refresh
> > period.  In a sense, the metadata expiration time is like the ZK session
> > expiration time.  You might want to turn it up if the cluster is
> > experiencing load spikes.
> >
> > >
> > > But to the general
> > > point, defining the algorithm would mean enforcing it to fair accuracy,
> > > whereas if the suggestion is that it'll be performed at a reasonable
> > time,
> > > it allows for batching and other optimizations. Perhaps I shouldn't be
> > > regarding what's defined in a KIP to be contractual in these cases, but
> > you
> > > could consider a first implementation to collect topics whose metadata
> > has
> > > exceeded (metadata.max.age.ms / 2), and sending the batch once a
> > > constituent topic's metadata is near the expiry, or a sufficient number
> > of
> > > topics have been collected (10? 100? 1000?).
> > >
> >
> > I'm concerned that if we change the metadata caching strategy without
> > discussing it first, it may improve certain workloads but make others
> > worse.  We need to be concrete about what the proposed strategy is so
> that
> > we can really evaluate it.
> >
> > >
> > >
> > > > We should be specific about what happens if the first few metadata
> 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-18 Thread deng ziming
hi, I reviewed the current code, the ProduceMetadata maintains an expiry
threshold for every topic, every time when we write to a topic we will set
the expiry time to -1 to indicate it should be updated, this does work to
reduce the size of the topic working set, but the producer will continue
fetching metadata for these topics in every metadata request for the full
expiry duration.

and we can improve the situation by 2 means:
1. we maintain a refresh threshold for every topic which is for example
0.8 * expiry_threshold, and when we send `MetadataRequest` to brokers we
just request unknownLeaderTopics + unknownPartitionTopics + topics
reach refresh threshold.
2. we don't invoke KafkaProducer#waitOnMetadata when we call
KafkaProducer#send because of we just send data to RecordAccumulator, and
before we send data to brokers we will invoke RecordAccumulator#ready(), so
we can only invoke waitOnMetadata to block when (number topics
reach refresh threshold)>(number of all known topics)*0.2.

I think the above 2 ways are enough to solve the current problem.

On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe  wrote:

> On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
> > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe  wrote:
> >
> > > Two seconds doesn't seem like a reasonable amount of time to leave for
> the
> > > metadata fetch.  Fetching halfway through the expiration period seems
> more
> > > reasonable.  It also doesn't require us to create a new configuration
> key,
> > > which is nice.
> > >
> > > Another option is to just do the metadata fetch every
> metadata.max.age.ms,
> > > but not expire the topic until we can't fetch the metadata for 2 *
> > > metadata.max.age.ms.
> > >
> >
> > I'd expect two seconds to be reasonable in the common case. Keep in mind
> > that this doesn't affect correctness, and a control operation returning
> > cached metadata should be on the order of milliseconds.
> >
>
> Hi Brian,
>
> Thanks again for the KIP.
>
> I think the issue here is not the common case, but the uncommon case where
> the metadata fetch takes longer than expected.  In that case, we don't want
> to be in the position of having our metadata expire because we waited too
> long to renew it.
>
> This is one reason why I think that the metadata expiration time should be
> longer than the metadata refresh time.  In fact, it might be worth having
> two separate configuration keys for these two values.  I could imagine a
> user who is having trouble with metadata expiration wanting to increase the
> metadata expiration time, but without increasing the metadata refresh
> period.  In a sense, the metadata expiration time is like the ZK session
> expiration time.  You might want to turn it up if the cluster is
> experiencing load spikes.
>
> >
> > But to the general
> > point, defining the algorithm would mean enforcing it to fair accuracy,
> > whereas if the suggestion is that it'll be performed at a reasonable
> time,
> > it allows for batching and other optimizations. Perhaps I shouldn't be
> > regarding what's defined in a KIP to be contractual in these cases, but
> you
> > could consider a first implementation to collect topics whose metadata
> has
> > exceeded (metadata.max.age.ms / 2), and sending the batch once a
> > constituent topic's metadata is near the expiry, or a sufficient number
> of
> > topics have been collected (10? 100? 1000?).
> >
>
> I'm concerned that if we change the metadata caching strategy without
> discussing it first, it may improve certain workloads but make others
> worse.  We need to be concrete about what the proposed strategy is so that
> we can really evaluate it.
>
> >
> >
> > > We should be specific about what happens if the first few metadata
> fetches
> > > fail.  Do we use exponential backoff to decide when to resend?  It
> seems
> > > like we really should, for all the usual reasons (reduce the load on
> > > brokers, ride out temporary service disruptions, etc.)  Maybe we could
> have
> > > an exponential retry backoff for each broker (in other words, we
> should try
> > > to contact a different broker before applying the backoff.)  I think
> this
> > > already sort of happens with the disconnect timeout, but we might need
> a
> > > more general solution.
> > >
> >
> > I don't plan to change this behavior. Currently it retries after a fixed
> > value of 'retry.backoff.ms' (defaults to 100 ms). It's possible that
> > different brokers are attempted, but I haven't dug into it.
> >
>
> I think it's critical to understand what the current behavior is before we
> try to change it.  The difference between retrying the same broker and
> trying a different one has a large impact it has on cluster load and
> latency.  For what it's worth, I believe the behavior is the second one,
> but it has been a while since I checked.  Let's figure this out.
>
> >
> > > Thanks for the clarification.  Fully asynchronous is the way to go, I
> > > agree.  I'm having trouble understanding how 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-18 Thread Colin McCabe
On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
> On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe  wrote:
> 
> > Two seconds doesn't seem like a reasonable amount of time to leave for the
> > metadata fetch.  Fetching halfway through the expiration period seems more
> > reasonable.  It also doesn't require us to create a new configuration key,
> > which is nice.
> >
> > Another option is to just do the metadata fetch every metadata.max.age.ms,
> > but not expire the topic until we can't fetch the metadata for 2 *
> > metadata.max.age.ms.
> >
> 
> I'd expect two seconds to be reasonable in the common case. Keep in mind
> that this doesn't affect correctness, and a control operation returning
> cached metadata should be on the order of milliseconds.
>

Hi Brian,

Thanks again for the KIP.

I think the issue here is not the common case, but the uncommon case where the 
metadata fetch takes longer than expected.  In that case, we don't want to be 
in the position of having our metadata expire because we waited too long to 
renew it.

This is one reason why I think that the metadata expiration time should be 
longer than the metadata refresh time.  In fact, it might be worth having two 
separate configuration keys for these two values.  I could imagine a user who 
is having trouble with metadata expiration wanting to increase the metadata 
expiration time, but without increasing the metadata refresh period.  In a 
sense, the metadata expiration time is like the ZK session expiration time.  
You might want to turn it up if the cluster is experiencing load spikes.

>
> But to the general
> point, defining the algorithm would mean enforcing it to fair accuracy,
> whereas if the suggestion is that it'll be performed at a reasonable time,
> it allows for batching and other optimizations. Perhaps I shouldn't be
> regarding what's defined in a KIP to be contractual in these cases, but you
> could consider a first implementation to collect topics whose metadata has
> exceeded (metadata.max.age.ms / 2), and sending the batch once a
> constituent topic's metadata is near the expiry, or a sufficient number of
> topics have been collected (10? 100? 1000?).
>

I'm concerned that if we change the metadata caching strategy without 
discussing it first, it may improve certain workloads but make others worse.  
We need to be concrete about what the proposed strategy is so that we can 
really evaluate it.

> 
> 
> > We should be specific about what happens if the first few metadata fetches
> > fail.  Do we use exponential backoff to decide when to resend?  It seems
> > like we really should, for all the usual reasons (reduce the load on
> > brokers, ride out temporary service disruptions, etc.)  Maybe we could have
> > an exponential retry backoff for each broker (in other words, we should try
> > to contact a different broker before applying the backoff.)  I think this
> > already sort of happens with the disconnect timeout, but we might need a
> > more general solution.
> >
> 
> I don't plan to change this behavior. Currently it retries after a fixed
> value of 'retry.backoff.ms' (defaults to 100 ms). It's possible that
> different brokers are attempted, but I haven't dug into it.
> 

I think it's critical to understand what the current behavior is before we try 
to change it.  The difference between retrying the same broker and trying a 
different one has a large impact it has on cluster load and latency.  For what 
it's worth, I believe the behavior is the second one, but it has been a while 
since I checked.  Let's figure this out.

> 
> > Thanks for the clarification.  Fully asynchronous is the way to go, I
> > agree.  I'm having trouble understanding how timeouts are handled in the
> > KIP.  It seems like if we can't fetch the metadata within the designated
> > metadata timeout, the future / callback should receive a TimeoutException
> > right?  We do not want the send call to be deferred forever if metadata
> > can't be fetched.  Eventually it should fail if it can't be performed.
> >
> > I do think this is something that will have to be mentioned in the
> > compatibility section.  There is some code out there that is probably
> > prepared to handle a timeout exception from the send function, which may
> > need to be updated to check for a timeout from the future or callback.
> >
> 
> Correct, a timeout exception would be delivered in the future. Sure, I can
> add that note to the KIP.
> 

Thanks.

best,
Colin

> 
> 
> > It seems like this is an existing problem.  You may fire off a lot of send
> > calls that get blocked because the broker that is the leader for a certain
> > partition is not responding.  I'm not sure that we need to do anything
> > special here.  On the other hand, we could make the case for a generic "max
> > number of outstanding sends" configuration to prevent surprise OOMs in the
> > existing cases, plus the new one we're adding.  But this feels like a bit
> > of a scope expansion.
> >
> 
> Right, 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-18 Thread Brian Byrne
On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe  wrote:

> Two seconds doesn't seem like a reasonable amount of time to leave for the
> metadata fetch.  Fetching halfway through the expiration period seems more
> reasonable.  It also doesn't require us to create a new configuration key,
> which is nice.
>
> Another option is to just do the metadata fetch every metadata.max.age.ms,
> but not expire the topic until we can't fetch the metadata for 2 *
> metadata.max.age.ms.
>

I'd expect two seconds to be reasonable in the common case. Keep in mind
that this doesn't affect correctness, and a control operation returning
cached metadata should be on the order of milliseconds. But to the general
point, defining the algorithm would mean enforcing it to fair accuracy,
whereas if the suggestion is that it'll be performed at a reasonable time,
it allows for batching and other optimizations. Perhaps I shouldn't be
regarding what's defined in a KIP to be contractual in these cases, but you
could consider a first implementation to collect topics whose metadata has
exceeded (metadata.max.age.ms / 2), and sending the batch once a
constituent topic's metadata is near the expiry, or a sufficient number of
topics have been collected (10? 100? 1000?).


We should be specific about what happens if the first few metadata fetches
> fail.  Do we use exponential backoff to decide when to resend?  It seems
> like we really should, for all the usual reasons (reduce the load on
> brokers, ride out temporary service disruptions, etc.)  Maybe we could have
> an exponential retry backoff for each broker (in other words, we should try
> to contact a different broker before applying the backoff.)  I think this
> already sort of happens with the disconnect timeout, but we might need a
> more general solution.
>

I don't plan to change this behavior. Currently it retries after a fixed
value of 'retry.backoff.ms' (defaults to 100 ms). It's possible that
different brokers are attempted, but I haven't dug into it.


Thanks for the clarification.  Fully asynchronous is the way to go, I
> agree.  I'm having trouble understanding how timeouts are handled in the
> KIP.  It seems like if we can't fetch the metadata within the designated
> metadata timeout, the future / callback should receive a TimeoutException
> right?  We do not want the send call to be deferred forever if metadata
> can't be fetched.  Eventually it should fail if it can't be performed.
>
> I do think this is something that will have to be mentioned in the
> compatibility section.  There is some code out there that is probably
> prepared to handle a timeout exception from the send function, which may
> need to be updated to check for a timeout from the future or callback.
>

Correct, a timeout exception would be delivered in the future. Sure, I can
add that note to the KIP.



> It seems like this is an existing problem.  You may fire off a lot of send
> calls that get blocked because the broker that is the leader for a certain
> partition is not responding.  I'm not sure that we need to do anything
> special here.  On the other hand, we could make the case for a generic "max
> number of outstanding sends" configuration to prevent surprise OOMs in the
> existing cases, plus the new one we're adding.  But this feels like a bit
> of a scope expansion.
>

Right, this is an existing problem, however the asynchronous send could
cause unexpected behavior. For example, if a client pinned
topics/partitions to individual send threads, then memory couldn't be
exhausted by a single topic since a blocking send would prevent further
records from being buffered on that topic. The compromise could be that we
only ever permit one outstanding record batch for a topic, which would keep
the code simple and wouldn't permit a single topic to consume all available
memory.



> They may be connected, but I'm not sure they should be the same.  Perhaps
> expiry should be 4x the typical fetch rate, for example.
>

That's true. You could also make the case for a faster expiry than refresh
as well. Makes sense to separate this out.



> Hmm are you sure this is an N^2 problem?  If you have T1 and T2, you
> fetch metadata for T1 and T2, right?  I guess you could argue that we often
> fetch metadata for partitions we don't care about, but that doesn't make it
> O(N^2).  Maybe there's something about the implementation that I'm missing.
>

My apologies, I left out the context. One issue the KIP is trying to
resolve is the metadata storm that's caused by producers starting up. In
the worst case, where the send call is only performed from a single thread
(i.e. no possible batching), fetching metadata for 1K topics will generate
1K RPCs, with payload 1+2+...+1K topics' metadata. Being smart about the
topics being refreshed would still generate 1K RPCs for 1 topic each, and
asynchronous behavior would permit batching (note steady-state refreshing
doesn't require the asynchronous behavior to batch).



> In 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-15 Thread Colin McCabe
On Tue, Nov 12, 2019, at 10:27, Brian Byrne wrote:
> Hi Colin,
> 
> Thanks for the feedback. I'm going to leave out some specifics in my
> response, since I'll go back to the KIP, revise it, and then post an update
> on the original discussion thread. I'll make two primary changes, (1)
> remove reference to expiry not being calculated appropriately, since this
> was determined to be a bug and is therefore being fixed aside from the KIP,
> and (2) be more specific with the details surrounding the actual
> implementation.
> 
> > Maybe I missed it, but there seemed to be some details missing here.  When
> > do we start the metadata fetch?  For example, if topic metadata expires
> > every 5 minutes, perhaps we should wait 4 minutes, then starting fetching
> > the new metadata, which we would expect to arrive by the 5 minute
> > deadline.  Or perhaps we should start the fetch even earlier, around the
> > 2.5 minute mark.  In any case, there should be some discussion about what
> > the actual policy is.  Given that metadata.max.age.ms is configurable,
> > maybe that policy  needs to be expressed in terms of a percentage of the
> > refresh period rather than in terms of an absolute delay.
> >
> 
> Sure, I can add this to the KIP, although I didn't want to make it too
> specific other than "at a reasonable time," which could practically be 2
> seconds before the expiry if it's reliable, or could be half the max age
> duration since we expect the cost to be negligible (with appropriate
> improvements).
>

Thanks again for tackling this, Brian.  I think It's important to be specific 
about what we're proposing here.

Two seconds doesn't seem like a reasonable amount of time to leave for the 
metadata fetch.  Fetching halfway through the expiration period seems more 
reasonable.  It also doesn't require us to create a new configuration key, 
which is nice.

Another option is to just do the metadata fetch every metadata.max.age.ms, but 
not expire the topic until we can't fetch the metadata for 2 * 
metadata.max.age.ms.

We should be specific about what happens if the first few metadata fetches 
fail.  Do we use exponential backoff to decide when to resend?  It seems like 
we really should, for all the usual reasons (reduce the load on brokers, ride 
out temporary service disruptions, etc.)  Maybe we could have an exponential 
retry backoff for each broker (in other words, we should try to contact a 
different broker before applying the backoff.)  I think this already sort of 
happens with the disconnect timeout, but we might need a more general solution.

> 
> > The KIP correctly points out that the current metadata fetching policy
> > causes us to "[block] in a function that's advertised as asynchronous."
> > However, the KIP doesn't seem to spell out whether we will continue to
> > block if metadata can't be found, or if this will be abolished.  Clearly,
> > starting the metadata fetch early will reduce blocking in the common case,
> > but will there still be blocking in the uncommon case where the early fetch
> > doesn't succeed in time?
> >
> 
> The goal is to be fully asynchronous for metadata resolution. Currently,
> the producer will block for a limited amount of time 'max.block.ms', which
> defaults to 60 seconds. If the metadata cannot be fetched during that
> period, a TimeoutException is thrown, however the proposed change would
> return the KafkaProducer::send()'s resulting Future<> immediately, and that
> timeout would eventually be passed via the future. If the metadata fetch is
> slow, then that record will be enqueued for processing once the metadata is
> resolved in a manner that's invisible to the client.

Thanks for the clarification.  Fully asynchronous is the way to go, I agree.  
I'm having trouble understanding how timeouts are handled in the KIP.  It seems 
like if we can't fetch the metadata within the designated metadata timeout, the 
future / callback should receive a TimeoutException right?  We do not want the 
send call to be deferred forever if metadata can't be fetched.  Eventually it 
should fail if it can't be performed.

I do think this is something that will have to be mentioned in the 
compatibility section.  There is some code out there that is probably prepared 
to handle a timeout exception from the send function, which may need to be 
updated to check for a timeout from the future or callback.

> 
> There will still be blocking in the event of memory pressure, but there's
> no good way around that (other than to fail the request immediately). The
> KIP should elaborate on how to deal with memory pressure, since there's a
> higher likelihood of filling the buffers while metadata fetching is
> outstanding, which invites starvation. There's ways to handle this, I will
> expand on this in the KIP.

It seems like this is an existing problem.  You may fire off a lot of send 
calls that get blocked because the broker that is the leader for a certain 
partition is not responding.  I'm 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-15 Thread Colin McCabe
On Tue, Nov 12, 2019, at 17:42, Guozhang Wang wrote:
> Sounds good to me, let's continue our voting process here.
> 
> Guozhang

Hi Brian & Guozhang,

I'm looking forward to getting this in.  But is the KIP complete yet?  Brian's 
earlier response said " I'm going to leave out some specifics in my response, 
since I'll go back to the KIP, revise it, and then post an update..."  It 
sounds like we should wait for those updates (if those updates have already 
happened, then let us know so we can recheck the wiki page.)

> 
> On Tue, Nov 12, 2019 at 12:10 PM Ismael Juma  wrote:
> 
> > This is not a bug fix, in my opinion. The existing behavior may be
> > confusing, but it was documented, so I assume it was intended.
> >

Yeah, I agree.  I think it will be nice to have this behavior documented in a 
KIP.  And there are user-visible behavior changes here, like removing blocking 
(a very welcome change!)

best,
Colin


> > Ismael
> >
> > On Mon, Nov 11, 2019, 2:47 PM Guozhang Wang  wrote:
> >
> > > Thanks for the update Brian, I think I agree with Colin that we should
> > > clarify on whether / how the blocking behavior due to metadata fetch
> > would
> > > be affected or not.
> > >
> > > About whether this needs to be voted as a KIP: to me the behavior change
> > is
> > > really a bug fix rather than a public contract change, since we are not
> > > incorrectly blocking a non-blocking declared function, but if people feel
> > > that we should be more careful about such changes I'm also fine with
> > voting
> > > it as an official KIP too.
> > >
> > >
> > > Guozhang
> > >
> > > On Sat, Nov 9, 2019 at 12:50 AM Brian Byrne  wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Regarding metadata expiry, no access times other than the initial
> > > lookup[1]
> > > > are used for determining when to expire producer metadata. Therefore,
> > > > frequently used topics' metadata will be aged out and subsequently
> > > > refreshed (in a blocking manner) every five minutes, and infrequently
> > > used
> > > > topics will be retained for a minimum of five minutes and currently
> > > > refetched on every metadata update during that time period. The
> > sentence
> > > is
> > > > suggesting that we could reduce the expiry time to improve the latter
> > > > without affecting (rather slightly improving) the former.
> > > >
> > > > Keep in mind that in most all cases, I wouldn't anticipate much of a
> > > > difference with producer behavior, and the extra logic can be
> > implemented
> > > > to have insignificant cost. It's the large/dynamic topic corner cases
> > > that
> > > > we're trying to improve.
> > > >
> > > > It'd be convenient if the KIP is no longer necessary. You're right in
> > > that
> > > > there's no public API changes and the behavioral changes are entirely
> > > > internal. I'd be happy to continue the discussion around the KIP, but
> > > > unless otherwise objected, it can be retired.
> > > >
> > > > [1] Not entirely accurate, it's actually the first time when the client
> > > > calculates whether to retain the topic in its metadata.
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > > > On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang 
> > wrote:
> > > >
> > > > > Hello Brian,
> > > > >
> > > > > Could you elaborate a bit more on this sentence: "This logic can be
> > > made
> > > > > more intelligent by managing the expiry from when the topic was last
> > > > used,
> > > > > enabling the expiry duration to be reduced to improve cases where a
> > > large
> > > > > number of topics are touched intermittently." Not sure I fully
> > > understand
> > > > > the proposal.
> > > > >
> > > > > Also since now this KIP did not make any public API changes and the
> > > > > behavioral changes are not considered a public API contract (i.e. how
> > > we
> > > > > maintain the topic metadata in producer cache is never committed to
> > > > users),
> > > > > I wonder if we still need a KIP for the proposed change any more?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne 
> > > wrote:
> > > > >
> > > > > > Hello all,
> > > > > >
> > > > > > I'd like to propose a vote for a producer change to improve
> > producer
> > > > > > behavior when dealing with a large number of topics, in part by
> > > > reducing
> > > > > > the amount of metadata fetching performed.
> > > > > >
> > > > > > The full KIP is provided here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > > > > >
> > > > > > And the discussion thread:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > > > > >
> > > > > > Thanks,
> > > > > > Brian
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-12 Thread Guozhang Wang
Sounds good to me, let's continue our voting process here.

Guozhang

On Tue, Nov 12, 2019 at 12:10 PM Ismael Juma  wrote:

> This is not a bug fix, in my opinion. The existing behavior may be
> confusing, but it was documented, so I assume it was intended.
>
> Ismael
>
> On Mon, Nov 11, 2019, 2:47 PM Guozhang Wang  wrote:
>
> > Thanks for the update Brian, I think I agree with Colin that we should
> > clarify on whether / how the blocking behavior due to metadata fetch
> would
> > be affected or not.
> >
> > About whether this needs to be voted as a KIP: to me the behavior change
> is
> > really a bug fix rather than a public contract change, since we are not
> > incorrectly blocking a non-blocking declared function, but if people feel
> > that we should be more careful about such changes I'm also fine with
> voting
> > it as an official KIP too.
> >
> >
> > Guozhang
> >
> > On Sat, Nov 9, 2019 at 12:50 AM Brian Byrne  wrote:
> >
> > > Hi Guozhang,
> > >
> > > Regarding metadata expiry, no access times other than the initial
> > lookup[1]
> > > are used for determining when to expire producer metadata. Therefore,
> > > frequently used topics' metadata will be aged out and subsequently
> > > refreshed (in a blocking manner) every five minutes, and infrequently
> > used
> > > topics will be retained for a minimum of five minutes and currently
> > > refetched on every metadata update during that time period. The
> sentence
> > is
> > > suggesting that we could reduce the expiry time to improve the latter
> > > without affecting (rather slightly improving) the former.
> > >
> > > Keep in mind that in most all cases, I wouldn't anticipate much of a
> > > difference with producer behavior, and the extra logic can be
> implemented
> > > to have insignificant cost. It's the large/dynamic topic corner cases
> > that
> > > we're trying to improve.
> > >
> > > It'd be convenient if the KIP is no longer necessary. You're right in
> > that
> > > there's no public API changes and the behavioral changes are entirely
> > > internal. I'd be happy to continue the discussion around the KIP, but
> > > unless otherwise objected, it can be retired.
> > >
> > > [1] Not entirely accurate, it's actually the first time when the client
> > > calculates whether to retain the topic in its metadata.
> > >
> > > Thanks,
> > > Brian
> > >
> > > On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang 
> wrote:
> > >
> > > > Hello Brian,
> > > >
> > > > Could you elaborate a bit more on this sentence: "This logic can be
> > made
> > > > more intelligent by managing the expiry from when the topic was last
> > > used,
> > > > enabling the expiry duration to be reduced to improve cases where a
> > large
> > > > number of topics are touched intermittently." Not sure I fully
> > understand
> > > > the proposal.
> > > >
> > > > Also since now this KIP did not make any public API changes and the
> > > > behavioral changes are not considered a public API contract (i.e. how
> > we
> > > > maintain the topic metadata in producer cache is never committed to
> > > users),
> > > > I wonder if we still need a KIP for the proposed change any more?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne 
> > wrote:
> > > >
> > > > > Hello all,
> > > > >
> > > > > I'd like to propose a vote for a producer change to improve
> producer
> > > > > behavior when dealing with a large number of topics, in part by
> > > reducing
> > > > > the amount of metadata fetching performed.
> > > > >
> > > > > The full KIP is provided here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > > > >
> > > > > And the discussion thread:
> > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > > > >
> > > > > Thanks,
> > > > > Brian
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-12 Thread Brian Byrne
Hi Colin,

Thanks for the feedback. I'm going to leave out some specifics in my
response, since I'll go back to the KIP, revise it, and then post an update
on the original discussion thread. I'll make two primary changes, (1)
remove reference to expiry not being calculated appropriately, since this
was determined to be a bug and is therefore being fixed aside from the KIP,
and (2) be more specific with the details surrounding the actual
implementation.

Maybe I missed it, but there seemed to be some details missing here.  When
> do we start the metadata fetch?  For example, if topic metadata expires
> every 5 minutes, perhaps we should wait 4 minutes, then starting fetching
> the new metadata, which we would expect to arrive by the 5 minute
> deadline.  Or perhaps we should start the fetch even earlier, around the
> 2.5 minute mark.  In any case, there should be some discussion about what
> the actual policy is.  Given that metadata.max.age.ms is configurable,
> maybe that policy  needs to be expressed in terms of a percentage of the
> refresh period rather than in terms of an absolute delay.
>

Sure, I can add this to the KIP, although I didn't want to make it too
specific other than "at a reasonable time," which could practically be 2
seconds before the expiry if it's reliable, or could be half the max age
duration since we expect the cost to be negligible (with appropriate
improvements).

The KIP correctly points out that the current metadata fetching policy
> causes us to "[block] in a function that's advertised as asynchronous."
> However, the KIP doesn't seem to spell out whether we will continue to
> block if metadata can't be found, or if this will be abolished.  Clearly,
> starting the metadata fetch early will reduce blocking in the common case,
> but will there still be blocking in the uncommon case where the early fetch
> doesn't succeed in time?
>

The goal is to be fully asynchronous for metadata resolution. Currently,
the producer will block for a limited amount of time 'max.block.ms', which
defaults to 60 seconds. If the metadata cannot be fetched during that
period, a TimeoutException is thrown, however the proposed change would
return the KafkaProducer::send()'s resulting Future<> immediately, and that
timeout would eventually be passed via the future. If the metadata fetch is
slow, then that record will be enqueued for processing once the metadata is
resolved in a manner that's invisible to the client.

There will still be blocking in the event of memory pressure, but there's
no good way around that (other than to fail the request immediately). The
KIP should elaborate on how to deal with memory pressure, since there's a
higher likelihood of filling the buffers while metadata fetching is
outstanding, which invites starvation. There's ways to handle this, I will
expand on this in the KIP.


> Can you clarify this part a bit?  It seems like we have a metadata
> expiration policy now for topics, and we will have one after this KIP, but
> they will be somewhat different.  But it's not clear to me what the
> differences are.
>

What the KIP is proposing is to expire topic metadata only if it hasn't
been accessed within the max age threshold. Currently, the '
metadata.max.age.ms' documents how frequently to refresh potentially-stale
topic metadata, but expiry (that is, evict from the Producer's metadata
cache entirely) is not connected to this value - it's the hard-coded 5
minutes. While these aren't exactly the same concepts, it was suggested
from the discussion thread to simplify things and make these values
connected.

In general, if load is a problem, we should probably consider adding some
> kind of jitter on the client side.  There are definitely cases where people
> start up a lot of clients at the same time in parallel and there is a
> thundering herd problem with metadata updates.  Adding jitter would spread
> the load across time rather than creating a spike every 5 minutes in this
> case.
>

There's actually a deeper issue that jitter wouldn't fix (yet) - it's that
metadata for *all* active topics is fetched on every metadata request,
which is an N-squared problem today (first encountered topic T1 fetches
T1's metadata, second topic T2 fetches T1+T2's metadata, and so on). So if
the number of topics is large, these can be a heavy RPCs - so much so that,
in degenerate cases, it was reported that clients were hitting their
configured timeout while awaiting the metadata fetch results. So tracking
when metadata was last refreshed isn't necessarily about the 5-minute
refresh (although in steady state, it could be), but rather how much
metadata to fetch when such a request is performed.

Once that's in place, jitter or some probabilistic fetching would help
spread the load.

Thanks,
Brian

On Mon, Nov 11, 2019 at 11:47 AM Colin McCabe  wrote:

> Hi Brian,
>
> Thanks for the KIP.
>
> Starting the metadata fetch before we need the result is definitely a
> great idea.  This way, the 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-11 Thread Ismael Juma
This is not a bug fix, in my opinion. The existing behavior may be
confusing, but it was documented, so I assume it was intended.

Ismael

On Mon, Nov 11, 2019, 2:47 PM Guozhang Wang  wrote:

> Thanks for the update Brian, I think I agree with Colin that we should
> clarify on whether / how the blocking behavior due to metadata fetch would
> be affected or not.
>
> About whether this needs to be voted as a KIP: to me the behavior change is
> really a bug fix rather than a public contract change, since we are not
> incorrectly blocking a non-blocking declared function, but if people feel
> that we should be more careful about such changes I'm also fine with voting
> it as an official KIP too.
>
>
> Guozhang
>
> On Sat, Nov 9, 2019 at 12:50 AM Brian Byrne  wrote:
>
> > Hi Guozhang,
> >
> > Regarding metadata expiry, no access times other than the initial
> lookup[1]
> > are used for determining when to expire producer metadata. Therefore,
> > frequently used topics' metadata will be aged out and subsequently
> > refreshed (in a blocking manner) every five minutes, and infrequently
> used
> > topics will be retained for a minimum of five minutes and currently
> > refetched on every metadata update during that time period. The sentence
> is
> > suggesting that we could reduce the expiry time to improve the latter
> > without affecting (rather slightly improving) the former.
> >
> > Keep in mind that in most all cases, I wouldn't anticipate much of a
> > difference with producer behavior, and the extra logic can be implemented
> > to have insignificant cost. It's the large/dynamic topic corner cases
> that
> > we're trying to improve.
> >
> > It'd be convenient if the KIP is no longer necessary. You're right in
> that
> > there's no public API changes and the behavioral changes are entirely
> > internal. I'd be happy to continue the discussion around the KIP, but
> > unless otherwise objected, it can be retired.
> >
> > [1] Not entirely accurate, it's actually the first time when the client
> > calculates whether to retain the topic in its metadata.
> >
> > Thanks,
> > Brian
> >
> > On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang  wrote:
> >
> > > Hello Brian,
> > >
> > > Could you elaborate a bit more on this sentence: "This logic can be
> made
> > > more intelligent by managing the expiry from when the topic was last
> > used,
> > > enabling the expiry duration to be reduced to improve cases where a
> large
> > > number of topics are touched intermittently." Not sure I fully
> understand
> > > the proposal.
> > >
> > > Also since now this KIP did not make any public API changes and the
> > > behavioral changes are not considered a public API contract (i.e. how
> we
> > > maintain the topic metadata in producer cache is never committed to
> > users),
> > > I wonder if we still need a KIP for the proposed change any more?
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne 
> wrote:
> > >
> > > > Hello all,
> > > >
> > > > I'd like to propose a vote for a producer change to improve producer
> > > > behavior when dealing with a large number of topics, in part by
> > reducing
> > > > the amount of metadata fetching performed.
> > > >
> > > > The full KIP is provided here:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > > >
> > > > And the discussion thread:
> > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-11 Thread Guozhang Wang
Thanks for the update Brian, I think I agree with Colin that we should
clarify on whether / how the blocking behavior due to metadata fetch would
be affected or not.

About whether this needs to be voted as a KIP: to me the behavior change is
really a bug fix rather than a public contract change, since we are not
incorrectly blocking a non-blocking declared function, but if people feel
that we should be more careful about such changes I'm also fine with voting
it as an official KIP too.


Guozhang

On Sat, Nov 9, 2019 at 12:50 AM Brian Byrne  wrote:

> Hi Guozhang,
>
> Regarding metadata expiry, no access times other than the initial lookup[1]
> are used for determining when to expire producer metadata. Therefore,
> frequently used topics' metadata will be aged out and subsequently
> refreshed (in a blocking manner) every five minutes, and infrequently used
> topics will be retained for a minimum of five minutes and currently
> refetched on every metadata update during that time period. The sentence is
> suggesting that we could reduce the expiry time to improve the latter
> without affecting (rather slightly improving) the former.
>
> Keep in mind that in most all cases, I wouldn't anticipate much of a
> difference with producer behavior, and the extra logic can be implemented
> to have insignificant cost. It's the large/dynamic topic corner cases that
> we're trying to improve.
>
> It'd be convenient if the KIP is no longer necessary. You're right in that
> there's no public API changes and the behavioral changes are entirely
> internal. I'd be happy to continue the discussion around the KIP, but
> unless otherwise objected, it can be retired.
>
> [1] Not entirely accurate, it's actually the first time when the client
> calculates whether to retain the topic in its metadata.
>
> Thanks,
> Brian
>
> On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang  wrote:
>
> > Hello Brian,
> >
> > Could you elaborate a bit more on this sentence: "This logic can be made
> > more intelligent by managing the expiry from when the topic was last
> used,
> > enabling the expiry duration to be reduced to improve cases where a large
> > number of topics are touched intermittently." Not sure I fully understand
> > the proposal.
> >
> > Also since now this KIP did not make any public API changes and the
> > behavioral changes are not considered a public API contract (i.e. how we
> > maintain the topic metadata in producer cache is never committed to
> users),
> > I wonder if we still need a KIP for the proposed change any more?
> >
> >
> > Guozhang
> >
> > On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne  wrote:
> >
> > > Hello all,
> > >
> > > I'd like to propose a vote for a producer change to improve producer
> > > behavior when dealing with a large number of topics, in part by
> reducing
> > > the amount of metadata fetching performed.
> > >
> > > The full KIP is provided here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > >
> > > And the discussion thread:
> > >
> > >
> >
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > >
> > > Thanks,
> > > Brian
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-11 Thread Colin McCabe
Hi Brian,

Thanks for the KIP.

Starting the metadata fetch before we need the result is definitely a great 
idea.  This way, the metadata fetch can be done in parallel with all the other 
stuff e producer is doing, rather than forcing the producer to periodically 
come to a halt periodically while metadata is fetched.

Maybe I missed it, but there seemed to be some details missing here.  When do 
we start the metadata fetch?  For example, if topic metadata expires every 5 
minutes, perhaps we should wait 4 minutes, then starting fetching the new 
metadata, which we would expect to arrive by the 5 minute deadline.  Or perhaps 
we should start the fetch even earlier, around the 2.5 minute mark.  In any 
case, there should be some discussion about what the actual policy is.  Given 
that metadata.max.age.ms is configurable, maybe that policy  needs to be 
expressed in terms of a percentage of the refresh period rather than in terms 
of an absolute delay.

The KIP correctly points out that the current metadata fetching policy causes 
us to "[block] in a function that's advertised as asynchronous."  However, the 
KIP doesn't seem to spell out whether we will continue to block if metadata 
can't be found, or if this will be abolished.  Clearly, starting the metadata 
fetch early will reduce blocking in the common case, but will there still be 
blocking in the uncommon case where the early fetch doesn't succeed in time?

 > To address (2), the producer currently maintains an expiry threshold for 
 > every topic, which is used to remove a topic from the working set at a 
 > future time (currently hard-coded to 5 minutes, this should be modified to 
 > use metadata.max.age.ms). While this does work to reduce the size of the 
 > topic working set, the producer will continue fetching metadata for these 
 > topics in every metadata request for the full expiry duration. This logic 
 > can be made more intelligent by managing the expiry from when the topic 
 > was last used, enabling the expiry duration to be reduced to improve cases 
 > where a large number of topics are touched intermittently.

Can you clarify this part a bit?  It seems like we have a metadata expiration 
policy now for topics, and we will have one after this KIP, but they will be 
somewhat different.  But it's not clear to me what the differences are.

In general, if load is a problem, we should probably consider adding some kind 
of jitter on the client side.  There are definitely cases where people start up 
a lot of clients at the same time in parallel and there is a thundering herd 
problem with metadata updates.  Adding jitter would spread the load across time 
rather than creating a spike every 5 minutes in this case.

best,
Colin


On Fri, Nov 8, 2019, at 08:59, Ismael Juma wrote:
> I think this KIP affects when we block which is actually user visible
> behavior. Right?
> 
> Ismael
> 
> On Fri, Nov 8, 2019, 8:50 AM Brian Byrne  wrote:
> 
> > Hi Guozhang,
> >
> > Regarding metadata expiry, no access times other than the initial lookup[1]
> > are used for determining when to expire producer metadata. Therefore,
> > frequently used topics' metadata will be aged out and subsequently
> > refreshed (in a blocking manner) every five minutes, and infrequently used
> > topics will be retained for a minimum of five minutes and currently
> > refetched on every metadata update during that time period. The sentence is
> > suggesting that we could reduce the expiry time to improve the latter
> > without affecting (rather slightly improving) the former.
> >
> > Keep in mind that in most all cases, I wouldn't anticipate much of a
> > difference with producer behavior, and the extra logic can be implemented
> > to have insignificant cost. It's the large/dynamic topic corner cases that
> > we're trying to improve.
> >
> > It'd be convenient if the KIP is no longer necessary. You're right in that
> > there's no public API changes and the behavioral changes are entirely
> > internal. I'd be happy to continue the discussion around the KIP, but
> > unless otherwise objected, it can be retired.
> >
> > [1] Not entirely accurate, it's actually the first time when the client
> > calculates whether to retain the topic in its metadata.
> >
> > Thanks,
> > Brian
> >
> > On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang  wrote:
> >
> > > Hello Brian,
> > >
> > > Could you elaborate a bit more on this sentence: "This logic can be made
> > > more intelligent by managing the expiry from when the topic was last
> > used,
> > > enabling the expiry duration to be reduced to improve cases where a large
> > > number of topics are touched intermittently." Not sure I fully understand
> > > the proposal.
> > >
> > > Also since now this KIP did not make any public API changes and the
> > > behavioral changes are not considered a public API contract (i.e. how we
> > > maintain the topic metadata in producer cache is never committed to
> > users),
> > > I wonder if we still need a KIP for 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-08 Thread Ismael Juma
I think this KIP affects when we block which is actually user visible
behavior. Right?

Ismael

On Fri, Nov 8, 2019, 8:50 AM Brian Byrne  wrote:

> Hi Guozhang,
>
> Regarding metadata expiry, no access times other than the initial lookup[1]
> are used for determining when to expire producer metadata. Therefore,
> frequently used topics' metadata will be aged out and subsequently
> refreshed (in a blocking manner) every five minutes, and infrequently used
> topics will be retained for a minimum of five minutes and currently
> refetched on every metadata update during that time period. The sentence is
> suggesting that we could reduce the expiry time to improve the latter
> without affecting (rather slightly improving) the former.
>
> Keep in mind that in most all cases, I wouldn't anticipate much of a
> difference with producer behavior, and the extra logic can be implemented
> to have insignificant cost. It's the large/dynamic topic corner cases that
> we're trying to improve.
>
> It'd be convenient if the KIP is no longer necessary. You're right in that
> there's no public API changes and the behavioral changes are entirely
> internal. I'd be happy to continue the discussion around the KIP, but
> unless otherwise objected, it can be retired.
>
> [1] Not entirely accurate, it's actually the first time when the client
> calculates whether to retain the topic in its metadata.
>
> Thanks,
> Brian
>
> On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang  wrote:
>
> > Hello Brian,
> >
> > Could you elaborate a bit more on this sentence: "This logic can be made
> > more intelligent by managing the expiry from when the topic was last
> used,
> > enabling the expiry duration to be reduced to improve cases where a large
> > number of topics are touched intermittently." Not sure I fully understand
> > the proposal.
> >
> > Also since now this KIP did not make any public API changes and the
> > behavioral changes are not considered a public API contract (i.e. how we
> > maintain the topic metadata in producer cache is never committed to
> users),
> > I wonder if we still need a KIP for the proposed change any more?
> >
> >
> > Guozhang
> >
> > On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne  wrote:
> >
> > > Hello all,
> > >
> > > I'd like to propose a vote for a producer change to improve producer
> > > behavior when dealing with a large number of topics, in part by
> reducing
> > > the amount of metadata fetching performed.
> > >
> > > The full KIP is provided here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > >
> > > And the discussion thread:
> > >
> > >
> >
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > >
> > > Thanks,
> > > Brian
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-08 Thread Brian Byrne
Hi Guozhang,

Regarding metadata expiry, no access times other than the initial lookup[1]
are used for determining when to expire producer metadata. Therefore,
frequently used topics' metadata will be aged out and subsequently
refreshed (in a blocking manner) every five minutes, and infrequently used
topics will be retained for a minimum of five minutes and currently
refetched on every metadata update during that time period. The sentence is
suggesting that we could reduce the expiry time to improve the latter
without affecting (rather slightly improving) the former.

Keep in mind that in most all cases, I wouldn't anticipate much of a
difference with producer behavior, and the extra logic can be implemented
to have insignificant cost. It's the large/dynamic topic corner cases that
we're trying to improve.

It'd be convenient if the KIP is no longer necessary. You're right in that
there's no public API changes and the behavioral changes are entirely
internal. I'd be happy to continue the discussion around the KIP, but
unless otherwise objected, it can be retired.

[1] Not entirely accurate, it's actually the first time when the client
calculates whether to retain the topic in its metadata.

Thanks,
Brian

On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang  wrote:

> Hello Brian,
>
> Could you elaborate a bit more on this sentence: "This logic can be made
> more intelligent by managing the expiry from when the topic was last used,
> enabling the expiry duration to be reduced to improve cases where a large
> number of topics are touched intermittently." Not sure I fully understand
> the proposal.
>
> Also since now this KIP did not make any public API changes and the
> behavioral changes are not considered a public API contract (i.e. how we
> maintain the topic metadata in producer cache is never committed to users),
> I wonder if we still need a KIP for the proposed change any more?
>
>
> Guozhang
>
> On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne  wrote:
>
> > Hello all,
> >
> > I'd like to propose a vote for a producer change to improve producer
> > behavior when dealing with a large number of topics, in part by reducing
> > the amount of metadata fetching performed.
> >
> > The full KIP is provided here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> >
> > And the discussion thread:
> >
> >
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> >
> > Thanks,
> > Brian
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-07 Thread Guozhang Wang
Hello Brian,

Could you elaborate a bit more on this sentence: "This logic can be made
more intelligent by managing the expiry from when the topic was last used,
enabling the expiry duration to be reduced to improve cases where a large
number of topics are touched intermittently." Not sure I fully understand
the proposal.

Also since now this KIP did not make any public API changes and the
behavioral changes are not considered a public API contract (i.e. how we
maintain the topic metadata in producer cache is never committed to users),
I wonder if we still need a KIP for the proposed change any more?


Guozhang

On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne  wrote:

> Hello all,
>
> I'd like to propose a vote for a producer change to improve producer
> behavior when dealing with a large number of topics, in part by reducing
> the amount of metadata fetching performed.
>
> The full KIP is provided here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
>
> And the discussion thread:
>
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
>
> Thanks,
> Brian
>


-- 
-- Guozhang


[VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-11-07 Thread Brian Byrne
Hello all,

I'd like to propose a vote for a producer change to improve producer
behavior when dealing with a large number of topics, in part by reducing
the amount of metadata fetching performed.

The full KIP is provided here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics

And the discussion thread:
https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E

Thanks,
Brian