Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-12-01 Thread Matthias J. Sax

Thanks for clarifying. Makes sense to me.

On 11/30/23 8:33 PM, Colt McNealy wrote:

Hi Matthias and everyone—

Some clarification is necessary just for posterity. It turns out that, on a
fresh standby task before we start polling for records, we wouldn't be able
to get the current end offset without a network call. This leaves us three
options:

A) Make it an Optional or use a sentinel value to mark that it's not
present.
B) Perform a network call to get the endOffset when it's not there.
C) Remove it.

Option A) seems like it could be a confusing API, especially because in the
strong majority of cases, the Optional would be empty. Option B) is
undesirable because of the performance considerations—if we're going to
make a network round trip, we might as well get some records back! That
leaves us with option C), which is the least-bad of all of them.

At LittleHorse we actually do care about the endOffset in the
onUpdateStart() method, and having it would be useful to us. However, the
work-around isn't horrible, because the endOffset will be passed into the
first call to onBatchLoaded() , which normally follows onUpdateStart()
within <100ms.

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Nov 30, 2023 at 4:43 PM Matthias J. Sax  wrote:


parameter is somewhat irrelevant to our use case


Sounds like a weird justification to change the KIP. Providing more
information is usually better than less, so it seems it won't hurt to
just keep it (seems useful in general to get the current end offset in
this callback) -- you can always ignore it, if it's not relevant for
your use case.


-Matthias


On 11/30/23 6:56 AM, Eduwer Camacaro wrote:

Hello everyone,

We have come to the conclusion, during our work on this KIP's
implementation, that the #onUpdateStart callback's "currentEndOffset"
parameter is somewhat irrelevant to our use case. When this callback is
invoked, I think this value is usually unknown. Our choice to delete this
parameter from the #onUpdateStart callback requires an update to the KIP.

Please feel free to review the PR and provide any comments you may have.

:)

Thanks in advance

Edu-

On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax 

wrote:



Thanks. SGTM.

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy 

wrote:



Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a

week

of business travel (and a few days with no time to code). I'd like to

tie

up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out

for a

vote, and b) making sure our English grammar is "correct", let's stick

with

'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote

network

call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not

documented

in the javadoc. As such, I am strongly against modifying the behavior

of

endOffsets() on the consumer as some people may implicitly depend on

the

contract.
3a. The Consumer#currentLag() method gives us exactly what we want

without

a network call (current lag from a cache, from which we can compute

the

offset).

4. I have no opinion about whether we should pass endOffset or

currentLag

to the callback. Either one has the same exact information inside it.

In

the interest of not changing the KIP after the vote has started, I'll

leave

it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:


Just want to checkpoint the current state of this KIP and make sure

we're

on track to get it in to 3.7 (we still have a few weeks)  -- looks

like

there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are

onBatchLoaded

and

onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a

good

balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable

enough

that

we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth

is

that English grammar can be sticky and while it could be argued that

it

is

the store which is updated, not the batch, I feel that it is

perfectly

clear what 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-11-30 Thread Colt McNealy
Hi Matthias and everyone—

Some clarification is necessary just for posterity. It turns out that, on a
fresh standby task before we start polling for records, we wouldn't be able
to get the current end offset without a network call. This leaves us three
options:

A) Make it an Optional or use a sentinel value to mark that it's not
present.
B) Perform a network call to get the endOffset when it's not there.
C) Remove it.

Option A) seems like it could be a confusing API, especially because in the
strong majority of cases, the Optional would be empty. Option B) is
undesirable because of the performance considerations—if we're going to
make a network round trip, we might as well get some records back! That
leaves us with option C), which is the least-bad of all of them.

At LittleHorse we actually do care about the endOffset in the
onUpdateStart() method, and having it would be useful to us. However, the
work-around isn't horrible, because the endOffset will be passed into the
first call to onBatchLoaded() , which normally follows onUpdateStart()
within <100ms.

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Nov 30, 2023 at 4:43 PM Matthias J. Sax  wrote:

> > parameter is somewhat irrelevant to our use case
>
> Sounds like a weird justification to change the KIP. Providing more
> information is usually better than less, so it seems it won't hurt to
> just keep it (seems useful in general to get the current end offset in
> this callback) -- you can always ignore it, if it's not relevant for
> your use case.
>
>
> -Matthias
>
>
> On 11/30/23 6:56 AM, Eduwer Camacaro wrote:
> > Hello everyone,
> >
> > We have come to the conclusion, during our work on this KIP's
> > implementation, that the #onUpdateStart callback's "currentEndOffset"
> > parameter is somewhat irrelevant to our use case. When this callback is
> > invoked, I think this value is usually unknown. Our choice to delete this
> > parameter from the #onUpdateStart callback requires an update to the KIP.
> >
> > Please feel free to review the PR and provide any comments you may have.
> :)
> > Thanks in advance
> >
> > Edu-
> >
> > On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks. SGTM.
> >>
> >> On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:
> >>> That all sounds good to me! Thanks for the KIP
> >>>
> >>> On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy 
> >> wrote:
> >>>
>  Hi Sophie, Matthias, Bruno, and Eduwer—
> 
>  Thanks for your patience as I have been scrambling to catch up after a
> >> week
>  of business travel (and a few days with no time to code). I'd like to
> >> tie
>  up some loose ends here, but in short, I don't think the KIP document
>  itself needs any changes (our internal implementation does, however).
> 
>  1. In the interest of a) not changing the KIP after it's already out
> >> for a
>  vote, and b) making sure our English grammar is "correct", let's stick
> >> with
>  'onBatchLoaded()`. It is the Store that gets updated, not the Batch.
> 
>  2. For me (and, thankfully, the community as well) adding a remote
> >> network
>  call at any point in this KIP is a non-starter. We'll ensure that
>  our implementation does not introduce one.
> 
>  3. I really don't like changing API behavior, even if it's not
> >> documented
>  in the javadoc. As such, I am strongly against modifying the behavior
> of
>  endOffsets() on the consumer as some people may implicitly depend on
> the
>  contract.
>  3a. The Consumer#currentLag() method gives us exactly what we want
> >> without
>  a network call (current lag from a cache, from which we can compute
> the
>  offset).
> 
>  4. I have no opinion about whether we should pass endOffset or
> >> currentLag
>  to the callback. Either one has the same exact information inside it.
> In
>  the interest of not changing the KIP after the vote has started, I'll
> >> leave
>  it as endOffset.
> 
>  As such, I believe the KIP doesn't need any updates, nor has it been
>  updated since the vote started.
> 
>  Would anyone else like to discuss something before the Otter Council
>  adjourns regarding this matter?
> 
>  Cheers,
>  Colt McNealy
> 
>  *Founder, LittleHorse.dev*
> 
> 
>  On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
>  sop...@responsive.dev>
>  wrote:
> 
> > Just want to checkpoint the current state of this KIP and make sure
> >> we're
> > on track to get it in to 3.7 (we still have a few weeks)  -- looks
> like
> > there are two remaining open questions, both relating to the
> > middle/intermediate callback:
> >
> > 1. What to name it: seems like the primary candidates are
> onBatchLoaded
>  and
> > onBatchUpdated (and maybe also onStandbyUpdated?)
> > 2. What additional information can we pass in that would strike a
> good
> > balance between 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-11-30 Thread Matthias J. Sax

parameter is somewhat irrelevant to our use case


Sounds like a weird justification to change the KIP. Providing more 
information is usually better than less, so it seems it won't hurt to 
just keep it (seems useful in general to get the current end offset in 
this callback) -- you can always ignore it, if it's not relevant for 
your use case.



-Matthias


On 11/30/23 6:56 AM, Eduwer Camacaro wrote:

Hello everyone,

We have come to the conclusion, during our work on this KIP's
implementation, that the #onUpdateStart callback's "currentEndOffset"
parameter is somewhat irrelevant to our use case. When this callback is
invoked, I think this value is usually unknown. Our choice to delete this
parameter from the #onUpdateStart callback requires an update to the KIP.

Please feel free to review the PR and provide any comments you may have. :)
Thanks in advance

Edu-

On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax  wrote:


Thanks. SGTM.

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy 

wrote:



Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a

week

of business travel (and a few days with no time to code). I'd like to

tie

up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out

for a

vote, and b) making sure our English grammar is "correct", let's stick

with

'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote

network

call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not

documented

in the javadoc. As such, I am strongly against modifying the behavior of
endOffsets() on the consumer as some people may implicitly depend on the
contract.
3a. The Consumer#currentLag() method gives us exactly what we want

without

a network call (current lag from a cache, from which we can compute the
offset).

4. I have no opinion about whether we should pass endOffset or

currentLag

to the callback. Either one has the same exact information inside it. In
the interest of not changing the KIP after the vote has started, I'll

leave

it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:


Just want to checkpoint the current state of this KIP and make sure

we're

on track to get it in to 3.7 (we still have a few weeks)  -- looks like
there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are onBatchLoaded

and

onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a good
balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable enough

that

we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth

is

that English grammar can be sticky and while it could be argued that it

is

the store which is updated, not the batch, I feel that it is perfectly
clear what is meant by "onBatchUpdated" and to me, this doesn't sound

weird

at all. That's just my two cents in case it helps, but again, whatever
makes sense to you Colt is fine

When it comes to #2 -- as much as I would love to dig into the Consumer
client lore and see if we can modify existing APIs or add new ones in

order

to get the desired offset metadata in an efficient way, I think we're
starting to go down a rabbit hole that is going to expand the scope way
beyond what Colt thought he was signing up for. I would advocate to

focus

on just the basic feature for now and drop the end-offset from the
callback. Once we have a standby listener it will be easy to expand on

with

a followup KIP if/when we find an efficient way to add additional

useful

information. I think it will also become more clear what is and isn't
useful after more people get to using it in the real world

Colt/Eduwer: how necessary is receiving the end offset during a batch
update to your own application use case?

Also, for those who really do need to check the current end offset, I
believe in theory you should be able to use the KafkaStreams#metrics

API

to

get the current lag and/or end offset for the changelog -- it's

possible

this does not represent the most up-to-date end offset (I'm 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-11-30 Thread Eduwer Camacaro
Hello everyone,

We have come to the conclusion, during our work on this KIP's
implementation, that the #onUpdateStart callback's "currentEndOffset"
parameter is somewhat irrelevant to our use case. When this callback is
invoked, I think this value is usually unknown. Our choice to delete this
parameter from the #onUpdateStart callback requires an update to the KIP.

Please feel free to review the PR and provide any comments you may have. :)
Thanks in advance

Edu-

On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax  wrote:

> Thanks. SGTM.
>
> On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:
> > That all sounds good to me! Thanks for the KIP
> >
> > On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy 
> wrote:
> >
> >> Hi Sophie, Matthias, Bruno, and Eduwer—
> >>
> >> Thanks for your patience as I have been scrambling to catch up after a
> week
> >> of business travel (and a few days with no time to code). I'd like to
> tie
> >> up some loose ends here, but in short, I don't think the KIP document
> >> itself needs any changes (our internal implementation does, however).
> >>
> >> 1. In the interest of a) not changing the KIP after it's already out
> for a
> >> vote, and b) making sure our English grammar is "correct", let's stick
> with
> >> 'onBatchLoaded()`. It is the Store that gets updated, not the Batch.
> >>
> >> 2. For me (and, thankfully, the community as well) adding a remote
> network
> >> call at any point in this KIP is a non-starter. We'll ensure that
> >> our implementation does not introduce one.
> >>
> >> 3. I really don't like changing API behavior, even if it's not
> documented
> >> in the javadoc. As such, I am strongly against modifying the behavior of
> >> endOffsets() on the consumer as some people may implicitly depend on the
> >> contract.
> >> 3a. The Consumer#currentLag() method gives us exactly what we want
> without
> >> a network call (current lag from a cache, from which we can compute the
> >> offset).
> >>
> >> 4. I have no opinion about whether we should pass endOffset or
> currentLag
> >> to the callback. Either one has the same exact information inside it. In
> >> the interest of not changing the KIP after the vote has started, I'll
> leave
> >> it as endOffset.
> >>
> >> As such, I believe the KIP doesn't need any updates, nor has it been
> >> updated since the vote started.
> >>
> >> Would anyone else like to discuss something before the Otter Council
> >> adjourns regarding this matter?
> >>
> >> Cheers,
> >> Colt McNealy
> >>
> >> *Founder, LittleHorse.dev*
> >>
> >>
> >> On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
> >> sop...@responsive.dev>
> >> wrote:
> >>
> >>> Just want to checkpoint the current state of this KIP and make sure
> we're
> >>> on track to get it in to 3.7 (we still have a few weeks)  -- looks like
> >>> there are two remaining open questions, both relating to the
> >>> middle/intermediate callback:
> >>>
> >>> 1. What to name it: seems like the primary candidates are onBatchLoaded
> >> and
> >>> onBatchUpdated (and maybe also onStandbyUpdated?)
> >>> 2. What additional information can we pass in that would strike a good
> >>> balance between being helpful and impacting performance.
> >>>
> >>> Regarding #1, I think all of the current options are reasonable enough
> >> that
> >>> we should just let Colt decide which he prefers. I personally think
> >>> #onBatchUpdated is fine -- Bruno does make a fair point but the truth
> is
> >>> that English grammar can be sticky and while it could be argued that it
> >> is
> >>> the store which is updated, not the batch, I feel that it is perfectly
> >>> clear what is meant by "onBatchUpdated" and to me, this doesn't sound
> >> weird
> >>> at all. That's just my two cents in case it helps, but again, whatever
> >>> makes sense to you Colt is fine
> >>>
> >>> When it comes to #2 -- as much as I would love to dig into the Consumer
> >>> client lore and see if we can modify existing APIs or add new ones in
> >> order
> >>> to get the desired offset metadata in an efficient way, I think we're
> >>> starting to go down a rabbit hole that is going to expand the scope way
> >>> beyond what Colt thought he was signing up for. I would advocate to
> focus
> >>> on just the basic feature for now and drop the end-offset from the
> >>> callback. Once we have a standby listener it will be easy to expand on
> >> with
> >>> a followup KIP if/when we find an efficient way to add additional
> useful
> >>> information. I think it will also become more clear what is and isn't
> >>> useful after more people get to using it in the real world
> >>>
> >>> Colt/Eduwer: how necessary is receiving the end offset during a batch
> >>> update to your own application use case?
> >>>
> >>> Also, for those who really do need to check the current end offset, I
> >>> believe in theory you should be able to use the KafkaStreams#metrics
> API
> >> to
> >>> get the current lag and/or end offset for the changelog -- it's
> possible
> >>> this does 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-26 Thread Matthias J. Sax

Thanks. SGTM.

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy  wrote:


Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a week
of business travel (and a few days with no time to code). I'd like to tie
up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out for a
vote, and b) making sure our English grammar is "correct", let's stick with
'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote network
call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not documented
in the javadoc. As such, I am strongly against modifying the behavior of
endOffsets() on the consumer as some people may implicitly depend on the
contract.
3a. The Consumer#currentLag() method gives us exactly what we want without
a network call (current lag from a cache, from which we can compute the
offset).

4. I have no opinion about whether we should pass endOffset or currentLag
to the callback. Either one has the same exact information inside it. In
the interest of not changing the KIP after the vote has started, I'll leave
it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:


Just want to checkpoint the current state of this KIP and make sure we're
on track to get it in to 3.7 (we still have a few weeks)  -- looks like
there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are onBatchLoaded

and

onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a good
balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable enough

that

we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth is
that English grammar can be sticky and while it could be argued that it

is

the store which is updated, not the batch, I feel that it is perfectly
clear what is meant by "onBatchUpdated" and to me, this doesn't sound

weird

at all. That's just my two cents in case it helps, but again, whatever
makes sense to you Colt is fine

When it comes to #2 -- as much as I would love to dig into the Consumer
client lore and see if we can modify existing APIs or add new ones in

order

to get the desired offset metadata in an efficient way, I think we're
starting to go down a rabbit hole that is going to expand the scope way
beyond what Colt thought he was signing up for. I would advocate to focus
on just the basic feature for now and drop the end-offset from the
callback. Once we have a standby listener it will be easy to expand on

with

a followup KIP if/when we find an efficient way to add additional useful
information. I think it will also become more clear what is and isn't
useful after more people get to using it in the real world

Colt/Eduwer: how necessary is receiving the end offset during a batch
update to your own application use case?

Also, for those who really do need to check the current end offset, I
believe in theory you should be able to use the KafkaStreams#metrics API

to

get the current lag and/or end offset for the changelog -- it's possible
this does not represent the most up-to-date end offset (I'm not sure it
does or does not), but it should be close enough to be reliable and

useful

for the purpose of monitoring -- I mean it is a metric, after all.

Hope this helps -- in the end, it's up to you (Colt) to decide what you
want to bring in scope or not. We still have more than 3 weeks until the
KIP freeze as currently proposed, so in theory you could even implement
this KIP without the end offset and then do a followup KIP to add the end
offset within the same release, ie without any deprecations. There are
plenty of paths forward here, so don't let us drag this out forever if

you

know what you want

Cheers,
Sophie

On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax 

wrote:



Forgot one thing:

We could also pass `currentLag()` into `onBachLoaded()` instead of
end-offset.


-Matthias

On 10/20/23 10:56 AM, Matthias J. Sax wrote:

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything specific about
`endOffset` 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-25 Thread Sophie Blee-Goldman
That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy  wrote:

> Hi Sophie, Matthias, Bruno, and Eduwer—
>
> Thanks for your patience as I have been scrambling to catch up after a week
> of business travel (and a few days with no time to code). I'd like to tie
> up some loose ends here, but in short, I don't think the KIP document
> itself needs any changes (our internal implementation does, however).
>
> 1. In the interest of a) not changing the KIP after it's already out for a
> vote, and b) making sure our English grammar is "correct", let's stick with
> 'onBatchLoaded()`. It is the Store that gets updated, not the Batch.
>
> 2. For me (and, thankfully, the community as well) adding a remote network
> call at any point in this KIP is a non-starter. We'll ensure that
> our implementation does not introduce one.
>
> 3. I really don't like changing API behavior, even if it's not documented
> in the javadoc. As such, I am strongly against modifying the behavior of
> endOffsets() on the consumer as some people may implicitly depend on the
> contract.
> 3a. The Consumer#currentLag() method gives us exactly what we want without
> a network call (current lag from a cache, from which we can compute the
> offset).
>
> 4. I have no opinion about whether we should pass endOffset or currentLag
> to the callback. Either one has the same exact information inside it. In
> the interest of not changing the KIP after the vote has started, I'll leave
> it as endOffset.
>
> As such, I believe the KIP doesn't need any updates, nor has it been
> updated since the vote started.
>
> Would anyone else like to discuss something before the Otter Council
> adjourns regarding this matter?
>
> Cheers,
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
> sop...@responsive.dev>
> wrote:
>
> > Just want to checkpoint the current state of this KIP and make sure we're
> > on track to get it in to 3.7 (we still have a few weeks)  -- looks like
> > there are two remaining open questions, both relating to the
> > middle/intermediate callback:
> >
> > 1. What to name it: seems like the primary candidates are onBatchLoaded
> and
> > onBatchUpdated (and maybe also onStandbyUpdated?)
> > 2. What additional information can we pass in that would strike a good
> > balance between being helpful and impacting performance.
> >
> > Regarding #1, I think all of the current options are reasonable enough
> that
> > we should just let Colt decide which he prefers. I personally think
> > #onBatchUpdated is fine -- Bruno does make a fair point but the truth is
> > that English grammar can be sticky and while it could be argued that it
> is
> > the store which is updated, not the batch, I feel that it is perfectly
> > clear what is meant by "onBatchUpdated" and to me, this doesn't sound
> weird
> > at all. That's just my two cents in case it helps, but again, whatever
> > makes sense to you Colt is fine
> >
> > When it comes to #2 -- as much as I would love to dig into the Consumer
> > client lore and see if we can modify existing APIs or add new ones in
> order
> > to get the desired offset metadata in an efficient way, I think we're
> > starting to go down a rabbit hole that is going to expand the scope way
> > beyond what Colt thought he was signing up for. I would advocate to focus
> > on just the basic feature for now and drop the end-offset from the
> > callback. Once we have a standby listener it will be easy to expand on
> with
> > a followup KIP if/when we find an efficient way to add additional useful
> > information. I think it will also become more clear what is and isn't
> > useful after more people get to using it in the real world
> >
> > Colt/Eduwer: how necessary is receiving the end offset during a batch
> > update to your own application use case?
> >
> > Also, for those who really do need to check the current end offset, I
> > believe in theory you should be able to use the KafkaStreams#metrics API
> to
> > get the current lag and/or end offset for the changelog -- it's possible
> > this does not represent the most up-to-date end offset (I'm not sure it
> > does or does not), but it should be close enough to be reliable and
> useful
> > for the purpose of monitoring -- I mean it is a metric, after all.
> >
> > Hope this helps -- in the end, it's up to you (Colt) to decide what you
> > want to bring in scope or not. We still have more than 3 weeks until the
> > KIP freeze as currently proposed, so in theory you could even implement
> > this KIP without the end offset and then do a followup KIP to add the end
> > offset within the same release, ie without any deprecations. There are
> > plenty of paths forward here, so don't let us drag this out forever if
> you
> > know what you want
> >
> > Cheers,
> > Sophie
> >
> > On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax 
> wrote:
> >
> > > Forgot one thing:
> > >
> > > We could also pass 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-25 Thread Colt McNealy
Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a week
of business travel (and a few days with no time to code). I'd like to tie
up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out for a
vote, and b) making sure our English grammar is "correct", let's stick with
'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote network
call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not documented
in the javadoc. As such, I am strongly against modifying the behavior of
endOffsets() on the consumer as some people may implicitly depend on the
contract.
3a. The Consumer#currentLag() method gives us exactly what we want without
a network call (current lag from a cache, from which we can compute the
offset).

4. I have no opinion about whether we should pass endOffset or currentLag
to the callback. Either one has the same exact information inside it. In
the interest of not changing the KIP after the vote has started, I'll leave
it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman 
wrote:

> Just want to checkpoint the current state of this KIP and make sure we're
> on track to get it in to 3.7 (we still have a few weeks)  -- looks like
> there are two remaining open questions, both relating to the
> middle/intermediate callback:
>
> 1. What to name it: seems like the primary candidates are onBatchLoaded and
> onBatchUpdated (and maybe also onStandbyUpdated?)
> 2. What additional information can we pass in that would strike a good
> balance between being helpful and impacting performance.
>
> Regarding #1, I think all of the current options are reasonable enough that
> we should just let Colt decide which he prefers. I personally think
> #onBatchUpdated is fine -- Bruno does make a fair point but the truth is
> that English grammar can be sticky and while it could be argued that it is
> the store which is updated, not the batch, I feel that it is perfectly
> clear what is meant by "onBatchUpdated" and to me, this doesn't sound weird
> at all. That's just my two cents in case it helps, but again, whatever
> makes sense to you Colt is fine
>
> When it comes to #2 -- as much as I would love to dig into the Consumer
> client lore and see if we can modify existing APIs or add new ones in order
> to get the desired offset metadata in an efficient way, I think we're
> starting to go down a rabbit hole that is going to expand the scope way
> beyond what Colt thought he was signing up for. I would advocate to focus
> on just the basic feature for now and drop the end-offset from the
> callback. Once we have a standby listener it will be easy to expand on with
> a followup KIP if/when we find an efficient way to add additional useful
> information. I think it will also become more clear what is and isn't
> useful after more people get to using it in the real world
>
> Colt/Eduwer: how necessary is receiving the end offset during a batch
> update to your own application use case?
>
> Also, for those who really do need to check the current end offset, I
> believe in theory you should be able to use the KafkaStreams#metrics API to
> get the current lag and/or end offset for the changelog -- it's possible
> this does not represent the most up-to-date end offset (I'm not sure it
> does or does not), but it should be close enough to be reliable and useful
> for the purpose of monitoring -- I mean it is a metric, after all.
>
> Hope this helps -- in the end, it's up to you (Colt) to decide what you
> want to bring in scope or not. We still have more than 3 weeks until the
> KIP freeze as currently proposed, so in theory you could even implement
> this KIP without the end offset and then do a followup KIP to add the end
> offset within the same release, ie without any deprecations. There are
> plenty of paths forward here, so don't let us drag this out forever if you
> know what you want
>
> Cheers,
> Sophie
>
> On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax  wrote:
>
> > Forgot one thing:
> >
> > We could also pass `currentLag()` into `onBachLoaded()` instead of
> > end-offset.
> >
> >
> > -Matthias
> >
> > On 10/20/23 10:56 AM, Matthias J. Sax wrote:
> > > Thanks for digging into this Bruno.
> > >
> > > The JavaDoc on the consumer does not say anything specific about
> > > `endOffset` guarantees:
> > >
> > >> Get the end offsets for the given partitions. 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-23 Thread Sophie Blee-Goldman
Just want to checkpoint the current state of this KIP and make sure we're
on track to get it in to 3.7 (we still have a few weeks)  -- looks like
there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are onBatchLoaded and
onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a good
balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable enough that
we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth is
that English grammar can be sticky and while it could be argued that it is
the store which is updated, not the batch, I feel that it is perfectly
clear what is meant by "onBatchUpdated" and to me, this doesn't sound weird
at all. That's just my two cents in case it helps, but again, whatever
makes sense to you Colt is fine

When it comes to #2 -- as much as I would love to dig into the Consumer
client lore and see if we can modify existing APIs or add new ones in order
to get the desired offset metadata in an efficient way, I think we're
starting to go down a rabbit hole that is going to expand the scope way
beyond what Colt thought he was signing up for. I would advocate to focus
on just the basic feature for now and drop the end-offset from the
callback. Once we have a standby listener it will be easy to expand on with
a followup KIP if/when we find an efficient way to add additional useful
information. I think it will also become more clear what is and isn't
useful after more people get to using it in the real world

Colt/Eduwer: how necessary is receiving the end offset during a batch
update to your own application use case?

Also, for those who really do need to check the current end offset, I
believe in theory you should be able to use the KafkaStreams#metrics API to
get the current lag and/or end offset for the changelog -- it's possible
this does not represent the most up-to-date end offset (I'm not sure it
does or does not), but it should be close enough to be reliable and useful
for the purpose of monitoring -- I mean it is a metric, after all.

Hope this helps -- in the end, it's up to you (Colt) to decide what you
want to bring in scope or not. We still have more than 3 weeks until the
KIP freeze as currently proposed, so in theory you could even implement
this KIP without the end offset and then do a followup KIP to add the end
offset within the same release, ie without any deprecations. There are
plenty of paths forward here, so don't let us drag this out forever if you
know what you want

Cheers,
Sophie

On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax  wrote:

> Forgot one thing:
>
> We could also pass `currentLag()` into `onBachLoaded()` instead of
> end-offset.
>
>
> -Matthias
>
> On 10/20/23 10:56 AM, Matthias J. Sax wrote:
> > Thanks for digging into this Bruno.
> >
> > The JavaDoc on the consumer does not say anything specific about
> > `endOffset` guarantees:
> >
> >> Get the end offsets for the given partitions. In the default {@code
> >> read_uncommitted} isolation level, the end
> >> offset is the high watermark (that is, the offset of the last
> >> successfully replicated message plus one). For
> >> {@code read_committed} consumers, the end offset is the last stable
> >> offset (LSO), which is the minimum of
> >> the high watermark and the smallest offset of any open transaction.
> >> Finally, if the partition has never been
> >> written to, the end offset is 0.
> >
> > Thus, I actually believe that it would be ok to change the
> > implementation and serve the answer from the `TopicPartitionState`?
> >
> > Another idea would be, to use `currentLag()` in combination with
> > `position()` (or the offset of the last read record) to compute the
> > end-offset of the fly?
> >
> >
> > -Matthias
> >
> > On 10/20/23 4:00 AM, Bruno Cadonna wrote:
> >> Hi,
> >>
> >> Matthias is correct that the end offsets are stored somewhere in the
> >> metadata of the consumer. More precisely, they are stored in the
> >> `TopicPartitionState`. However, I could not find public API on the
> >> consumer other than currentLag() that uses the stored end offsets. If
> >> I understand the code correctly, method endOffSets() always triggers a
> >> remote call.
> >>
> >> I am a bit concerned about doing remote calls every commit.interval.ms
> >> (by default 200ms under EOS). At the moment the remote calls are only
> >> issued if an optimization for KTables is turned on where changelog
> >> topics are replaced with the input topic of the KTable. The current
> >> remote calls retrieve all committed offsets of the group at once. If I
> >> understand correctly, that is one single remote call. Remote calls for
> >> getting end offsets of changelog topics -- as I understand you are
> >> planning to issue -- will probably 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-20 Thread Matthias J. Sax

Forgot one thing:

We could also pass `currentLag()` into `onBachLoaded()` instead of 
end-offset.



-Matthias

On 10/20/23 10:56 AM, Matthias J. Sax wrote:

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything specific about 
`endOffset` guarantees:


Get the end offsets for the given partitions. In the default {@code 
read_uncommitted} isolation level, the end
offset is the high watermark (that is, the offset of the last 
successfully replicated message plus one). For
{@code read_committed} consumers, the end offset is the last stable 
offset (LSO), which is the minimum of
the high watermark and the smallest offset of any open transaction. 
Finally, if the partition has never been

written to, the end offset is 0.


Thus, I actually believe that it would be ok to change the 
implementation and serve the answer from the `TopicPartitionState`?


Another idea would be, to use `currentLag()` in combination with 
`position()` (or the offset of the last read record) to compute the 
end-offset of the fly?



-Matthias

On 10/20/23 4:00 AM, Bruno Cadonna wrote:

Hi,

Matthias is correct that the end offsets are stored somewhere in the 
metadata of the consumer. More precisely, they are stored in the 
`TopicPartitionState`. However, I could not find public API on the 
consumer other than currentLag() that uses the stored end offsets. If 
I understand the code correctly, method endOffSets() always triggers a 
remote call.


I am a bit concerned about doing remote calls every commit.interval.ms 
(by default 200ms under EOS). At the moment the remote calls are only 
issued if an optimization for KTables is turned on where changelog 
topics are replaced with the input topic of the KTable. The current 
remote calls retrieve all committed offsets of the group at once. If I 
understand correctly, that is one single remote call. Remote calls for 
getting end offsets of changelog topics -- as I understand you are 
planning to issue -- will probably result in multiple remote calls to 
multiple leaders of the changelog topic partitions.


Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in 
such a way to get the end offset from the locally stored metadata 
whenever possible as part of the implementation of this KIP. I do not 
know what the implications are of such a change of the consumer and if 
a KIP is needed for it. Maybe, endOffsets() guarantees to return the 
freshest end offsets possible, which would not be satisfied with the 
modification.


Regarding the naming, I do not completely agree with Matthias. While 
the pattern might be consistent with onBatchUpdated, what is the 
meaning of onBatchUpdated? Is the batch updated? The names 
onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO.
With "restore" the pattern works better. If I restore a batch of 
records in a state, the records are not there although they should be 
there and I add them. If I update a batch of records in a state. This 
sounds like the batch of records is in the state and I modify the 
existing records within the state. That is clearly not the meaning of 
the event for which the listener should be called.


Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
  - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch 
response. (We exploit this behavior to track end-offsets for input 
topic with regard to `max.task.idle.ms` without overhead -- it was 
also a concern when we did the corresponding KIP how we could track 
lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly 
to get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call 
using
the admin client in order to know this "endOffset" and that will 
have an
impact on performance. We can either find a solution that has a low 
impact
on performance or ideally zero impact; unfortunately, I don't see a 
way to

have zero impact on performance. However, we can leverage 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-20 Thread Matthias J. Sax

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything specific about 
`endOffset` guarantees:



Get the end offsets for the given partitions. In the default {@code 
read_uncommitted} isolation level, the end
offset is the high watermark (that is, the offset of the last successfully 
replicated message plus one). For
{@code read_committed} consumers, the end offset is the last stable offset 
(LSO), which is the minimum of
the high watermark and the smallest offset of any open transaction. Finally, if 
the partition has never been
written to, the end offset is 0.


Thus, I actually believe that it would be ok to change the 
implementation and serve the answer from the `TopicPartitionState`?


Another idea would be, to use `currentLag()` in combination with 
`position()` (or the offset of the last read record) to compute the 
end-offset of the fly?



-Matthias

On 10/20/23 4:00 AM, Bruno Cadonna wrote:

Hi,

Matthias is correct that the end offsets are stored somewhere in the 
metadata of the consumer. More precisely, they are stored in the 
`TopicPartitionState`. However, I could not find public API on the 
consumer other than currentLag() that uses the stored end offsets. If I 
understand the code correctly, method endOffSets() always triggers a 
remote call.


I am a bit concerned about doing remote calls every commit.interval.ms 
(by default 200ms under EOS). At the moment the remote calls are only 
issued if an optimization for KTables is turned on where changelog 
topics are replaced with the input topic of the KTable. The current 
remote calls retrieve all committed offsets of the group at once. If I 
understand correctly, that is one single remote call. Remote calls for 
getting end offsets of changelog topics -- as I understand you are 
planning to issue -- will probably result in multiple remote calls to 
multiple leaders of the changelog topic partitions.


Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in such 
a way to get the end offset from the locally stored metadata whenever 
possible as part of the implementation of this KIP. I do not know what 
the implications are of such a change of the consumer and if a KIP is 
needed for it. Maybe, endOffsets() guarantees to return the freshest end 
offsets possible, which would not be satisfied with the modification.


Regarding the naming, I do not completely agree with Matthias. While the 
pattern might be consistent with onBatchUpdated, what is the meaning of 
onBatchUpdated? Is the batch updated? The names onBatchLoaded or 
onBatchWritten or onBatchAdded are more clear IMO.
With "restore" the pattern works better. If I restore a batch of records 
in a state, the records are not there although they should be there and 
I add them. If I update a batch of records in a state. This sounds like 
the batch of records is in the state and I modify the existing records 
within the state. That is clearly not the meaning of the event for which 
the listener should be called.


Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
  - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch 
response. (We exploit this behavior to track end-offsets for input 
topic with regard to `max.task.idle.ms` without overhead -- it was 
also a concern when we did the corresponding KIP how we could track 
lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly to 
get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low 
impact
on performance or ideally zero impact; unfortunately, I don't see a 
way to

have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the 
admin
client to ask for these "endOffset"s. As far I can understand, this 
update

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-20 Thread Bruno Cadonna

Hi,

Matthias is correct that the end offsets are stored somewhere in the 
metadata of the consumer. More precisely, they are stored in the 
`TopicPartitionState`. However, I could not find public API on the 
consumer other than currentLag() that uses the stored end offsets. If I 
understand the code correctly, method endOffSets() always triggers a 
remote call.


I am a bit concerned about doing remote calls every commit.interval.ms 
(by default 200ms under EOS). At the moment the remote calls are only 
issued if an optimization for KTables is turned on where changelog 
topics are replaced with the input topic of the KTable. The current 
remote calls retrieve all committed offsets of the group at once. If I 
understand correctly, that is one single remote call. Remote calls for 
getting end offsets of changelog topics -- as I understand you are 
planning to issue -- will probably result in multiple remote calls to 
multiple leaders of the changelog topic partitions.


Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in such 
a way to get the end offset from the locally stored metadata whenever 
possible as part of the implementation of this KIP. I do not know what 
the implications are of such a change of the consumer and if a KIP is 
needed for it. Maybe, endOffsets() guarantees to return the freshest end 
offsets possible, which would not be satisfied with the modification.


Regarding the naming, I do not completely agree with Matthias. While the 
pattern might be consistent with onBatchUpdated, what is the meaning of 
onBatchUpdated? Is the batch updated? The names onBatchLoaded or 
onBatchWritten or onBatchAdded are more clear IMO.
With "restore" the pattern works better. If I restore a batch of records 
in a state, the records are not there although they should be there and 
I add them. If I update a batch of records in a state. This sounds like 
the batch of records is in the state and I modify the existing records 
within the state. That is clearly not the meaning of the event for which 
the listener should be called.


Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
  - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch response. 
(We exploit this behavior to track end-offsets for input topic with 
regard to `max.task.idle.ms` without overhead -- it was also a concern 
when we did the corresponding KIP how we could track lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly to 
get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low 
impact
on performance or ideally zero impact; unfortunately, I don't see a 
way to

have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin
client to ask for these "endOffset"s. As far I can understand, this 
update

is done periodically using the "commit.interval.ms" configuration. I
believe this option will force us to invoke StandbyUpdateLister once this
interval is reached.

On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna  wrote:


Thanks for the KIP, Colt and Eduwer,

Are you sure there is also not a significant performance impact for
passing into the callback `currentEndOffset`?

I am asking because the comment here:

https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129

says that the end-offset is only updated once for standby tasks whose
changelog topic is not piggy-backed on input topics. I could also not
find the update of end-offset for those standbys.


Best,
Bruno

On 10/16/23 10:55 AM, Lucas Brutschy wrote:

Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-18 Thread Matthias J. Sax

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


 - it's an update-listener, not loaded-listener
 - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
 - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch response. 
(We exploit this behavior to track end-offsets for input topic with 
regard to `max.task.idle.ms` without overhead -- it was also a concern 
when we did the corresponding KIP how we could track lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly to 
get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low impact
on performance or ideally zero impact; unfortunately, I don't see a way to
have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin
client to ask for these "endOffset"s. As far I can understand, this update
is done periodically using the "commit.interval.ms" configuration. I
believe this option will force us to invoke StandbyUpdateLister once this
interval is reached.

On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna  wrote:


Thanks for the KIP, Colt and Eduwer,

Are you sure there is also not a significant performance impact for
passing into the callback `currentEndOffset`?

I am asking because the comment here:

https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129

says that the end-offset is only updated once for standby tasks whose
changelog topic is not piggy-backed on input topics. I could also not
find the update of end-offset for those standbys.


Best,
Bruno

On 10/16/23 10:55 AM, Lucas Brutschy wrote:

Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came here to say that it seems to me consensus
has been reached and the result looks good to me.

Thanks Colt and Eduwer!
Lucas

On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy 

wrote:


Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy 

wrote:


Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth

going

on,

so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do

similarly

for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well,

mostly

Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <

edu...@littlehorse.io>

wrote:


Hello everyone,

Thanks for all your feedback for this KIP!

I think that the key to choosing proper names for this API is

understanding

the terms used inside the StoreChangelogReader. Currently, this class

has

two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my

opinion,

using StandbyUpdateListener for the interface fits better on these

terms.

Same applies for onUpdateStart/Suspended.

StoreChangelogReader uses "the same mechanism" for active task

restoration

and standby task updates, but this is an implementation detail. Under
normal circumstances (no rebalances or task migrations), the

changelog

reader will be in STANDBY_UPDATING, which means it will be updating

standby

tasks as long as there are new records in the changelog topic. That's

why I

prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't

100%

align with StateRestoreListener, but either one is fine.

Edu

On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Hello Colt,

Thanks for writing the KIP! I have read through the updated KIP and

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-17 Thread Eduwer Camacaro
Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low impact
on performance or ideally zero impact; unfortunately, I don't see a way to
have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin
client to ask for these "endOffset"s. As far I can understand, this update
is done periodically using the "commit.interval.ms" configuration. I
believe this option will force us to invoke StandbyUpdateLister once this
interval is reached.

On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna  wrote:

> Thanks for the KIP, Colt and Eduwer,
>
> Are you sure there is also not a significant performance impact for
> passing into the callback `currentEndOffset`?
>
> I am asking because the comment here:
>
> https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129
>
> says that the end-offset is only updated once for standby tasks whose
> changelog topic is not piggy-backed on input topics. I could also not
> find the update of end-offset for those standbys.
>
>
> Best,
> Bruno
>
> On 10/16/23 10:55 AM, Lucas Brutschy wrote:
> > Hi all,
> >
> > it's a nice improvement! I don't have anything to add on top of the
> > previous comments, just came here to say that it seems to me consensus
> > has been reached and the result looks good to me.
> >
> > Thanks Colt and Eduwer!
> > Lucas
> >
> > On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy 
> wrote:
> >>
> >> Thanks, Guozhang. I've updated the KIP and will start a vote.
> >>
> >> Colt McNealy
> >>
> >> *Founder, LittleHorse.dev*
> >>
> >>
> >> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <
> guozhang.wang...@gmail.com>
> >> wrote:
> >>
> >>> Thanks for the summary, that looks good to me.
> >>>
> >>> Guozhang
> >>>
> >>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy 
> wrote:
> 
>  Hello there!
> 
>  Thanks everyone for the comments. There's a lot of back-and-forth
> going
> >>> on,
>  so I'll do my best to summarize what everyone's said in TLDR format:
> 
>  1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
> >>> similarly
>  for the other methods.
>  2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
>  3. Remove the `earliestOffset` parameter for performance reasons.
> 
>  If that's all fine with everyone, I'll update the KIP and we—well,
> mostly
>  Edu (:  —will open a PR.
> 
>  Cheers,
>  Colt McNealy
> 
>  *Founder, LittleHorse.dev*
> 
> 
>  On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <
> edu...@littlehorse.io>
>  wrote:
> 
> > Hello everyone,
> >
> > Thanks for all your feedback for this KIP!
> >
> > I think that the key to choosing proper names for this API is
> >>> understanding
> > the terms used inside the StoreChangelogReader. Currently, this class
> >>> has
> > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my
> >>> opinion,
> > using StandbyUpdateListener for the interface fits better on these
> >>> terms.
> > Same applies for onUpdateStart/Suspended.
> >
> > StoreChangelogReader uses "the same mechanism" for active task
> >>> restoration
> > and standby task updates, but this is an implementation detail. Under
> > normal circumstances (no rebalances or task migrations), the
> changelog
> > reader will be in STANDBY_UPDATING, which means it will be updating
> >>> standby
> > tasks as long as there are new records in the changelog topic. That's
> >>> why I
> > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't
> >>> 100%
> > align with StateRestoreListener, but either one is fine.
> >
> > Edu
> >
> > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> >>> guozhang.wang...@gmail.com>
> > wrote:
> >
> >> Hello Colt,
> >>
> >> Thanks for writing the KIP! I have read through the updated KIP and
> >> overall it looks great. I only have minor naming comments (well,
> >> aren't naming the least boring stuff to discuss and that takes the
> >> most of the time for KIPs :P):
> >>
> >> 1. I tend to agree with Sophie regarding whether or not to include
> >> "Standby" in the functions of "onStandbyUpdateStart/Suspended",
> since
> >> it is also more consistent with the functions of
> >> "StateRestoreListener" where we do not name it as
> >> "onStateRestoreState" etc.
> >>
> >> 2. I know in community discussions we sometimes say "a standby is
> >> promoted to active", but in the official code / java docs we did not
> >> have a term of "promotion", since what the code does is really
> >>> recycle
> >> the 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-16 Thread Bruno Cadonna

Thanks for the KIP, Colt and Eduwer,

Are you sure there is also not a significant performance impact for 
passing into the callback `currentEndOffset`?


I am asking because the comment here: 
https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129


says that the end-offset is only updated once for standby tasks whose 
changelog topic is not piggy-backed on input topics. I could also not 
find the update of end-offset for those standbys.



Best,
Bruno

On 10/16/23 10:55 AM, Lucas Brutschy wrote:

Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came here to say that it seems to me consensus
has been reached and the result looks good to me.

Thanks Colt and Eduwer!
Lucas

On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy  wrote:


Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang 
wrote:


Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy  wrote:


Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth going

on,

so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do

similarly

for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well, mostly
Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
wrote:


Hello everyone,

Thanks for all your feedback for this KIP!

I think that the key to choosing proper names for this API is

understanding

the terms used inside the StoreChangelogReader. Currently, this class

has

two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my

opinion,

using StandbyUpdateListener for the interface fits better on these

terms.

Same applies for onUpdateStart/Suspended.

StoreChangelogReader uses "the same mechanism" for active task

restoration

and standby task updates, but this is an implementation detail. Under
normal circumstances (no rebalances or task migrations), the changelog
reader will be in STANDBY_UPDATING, which means it will be updating

standby

tasks as long as there are new records in the changelog topic. That's

why I

prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't

100%

align with StateRestoreListener, but either one is fine.

Edu

On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Hello Colt,

Thanks for writing the KIP! I have read through the updated KIP and
overall it looks great. I only have minor naming comments (well,
aren't naming the least boring stuff to discuss and that takes the
most of the time for KIPs :P):

1. I tend to agree with Sophie regarding whether or not to include
"Standby" in the functions of "onStandbyUpdateStart/Suspended", since
it is also more consistent with the functions of
"StateRestoreListener" where we do not name it as
"onStateRestoreState" etc.

2. I know in community discussions we sometimes say "a standby is
promoted to active", but in the official code / java docs we did not
have a term of "promotion", since what the code does is really

recycle

the task (while keeping its state stores open), and create a new
active task that takes in the recycled state stores and just changing
the other fields like task type etc. After thinking about this for a
bit, I tend to feel that "promoted" is indeed a better name for user
facing purposes while "recycle" is more of a technical detail inside
the code and could be abstracted away from users. So I feel keeping
the name "PROMOTED" is fine.

3. Regarding "earliestOffset", it does feel like we cannot always
avoid another call to the Kafka API. And on the other hand, I also
tend to think that such bookkeeping may be better done at the app
level than from the Streams' public API level. I.e. the app could

keep

a "first ever starting offset" per "topic-partition-store" key, and a
when we have rolling restart and hence some standby task keeps
"jumping" from one client to another via task assignment, the app
would update this value just one when it finds the
""topic-partition-store" was never triggered before. What do you
think?

4. I do not have a strong opinion either, but what about

"onBatchUpdated" ?



Guozhang

On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy 

wrote:


Sohpie—

Thank you very much for such a detailed review of the KIP. It might
actually be longer than the original KIP in the first place!

1. Ack'ed and fixed.

2. Correct, this is a confusing passage and requires context:

One thing on our list of TODO's regarding reliability is to

determine

how


Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-16 Thread Lucas Brutschy
Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came here to say that it seems to me consensus
has been reached and the result looks good to me.

Thanks Colt and Eduwer!
Lucas

On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy  wrote:
>
> Thanks, Guozhang. I've updated the KIP and will start a vote.
>
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang 
> wrote:
>
> > Thanks for the summary, that looks good to me.
> >
> > Guozhang
> >
> > On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy  wrote:
> > >
> > > Hello there!
> > >
> > > Thanks everyone for the comments. There's a lot of back-and-forth going
> > on,
> > > so I'll do my best to summarize what everyone's said in TLDR format:
> > >
> > > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
> > similarly
> > > for the other methods.
> > > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> > > 3. Remove the `earliestOffset` parameter for performance reasons.
> > >
> > > If that's all fine with everyone, I'll update the KIP and we—well, mostly
> > > Edu (:  —will open a PR.
> > >
> > > Cheers,
> > > Colt McNealy
> > >
> > > *Founder, LittleHorse.dev*
> > >
> > >
> > > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
> > > wrote:
> > >
> > > > Hello everyone,
> > > >
> > > > Thanks for all your feedback for this KIP!
> > > >
> > > > I think that the key to choosing proper names for this API is
> > understanding
> > > > the terms used inside the StoreChangelogReader. Currently, this class
> > has
> > > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my
> > opinion,
> > > > using StandbyUpdateListener for the interface fits better on these
> > terms.
> > > > Same applies for onUpdateStart/Suspended.
> > > >
> > > > StoreChangelogReader uses "the same mechanism" for active task
> > restoration
> > > > and standby task updates, but this is an implementation detail. Under
> > > > normal circumstances (no rebalances or task migrations), the changelog
> > > > reader will be in STANDBY_UPDATING, which means it will be updating
> > standby
> > > > tasks as long as there are new records in the changelog topic. That's
> > why I
> > > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't
> > 100%
> > > > align with StateRestoreListener, but either one is fine.
> > > >
> > > > Edu
> > > >
> > > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> > guozhang.wang...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Colt,
> > > > >
> > > > > Thanks for writing the KIP! I have read through the updated KIP and
> > > > > overall it looks great. I only have minor naming comments (well,
> > > > > aren't naming the least boring stuff to discuss and that takes the
> > > > > most of the time for KIPs :P):
> > > > >
> > > > > 1. I tend to agree with Sophie regarding whether or not to include
> > > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > > > > it is also more consistent with the functions of
> > > > > "StateRestoreListener" where we do not name it as
> > > > > "onStateRestoreState" etc.
> > > > >
> > > > > 2. I know in community discussions we sometimes say "a standby is
> > > > > promoted to active", but in the official code / java docs we did not
> > > > > have a term of "promotion", since what the code does is really
> > recycle
> > > > > the task (while keeping its state stores open), and create a new
> > > > > active task that takes in the recycled state stores and just changing
> > > > > the other fields like task type etc. After thinking about this for a
> > > > > bit, I tend to feel that "promoted" is indeed a better name for user
> > > > > facing purposes while "recycle" is more of a technical detail inside
> > > > > the code and could be abstracted away from users. So I feel keeping
> > > > > the name "PROMOTED" is fine.
> > > > >
> > > > > 3. Regarding "earliestOffset", it does feel like we cannot always
> > > > > avoid another call to the Kafka API. And on the other hand, I also
> > > > > tend to think that such bookkeeping may be better done at the app
> > > > > level than from the Streams' public API level. I.e. the app could
> > keep
> > > > > a "first ever starting offset" per "topic-partition-store" key, and a
> > > > > when we have rolling restart and hence some standby task keeps
> > > > > "jumping" from one client to another via task assignment, the app
> > > > > would update this value just one when it finds the
> > > > > ""topic-partition-store" was never triggered before. What do you
> > > > > think?
> > > > >
> > > > > 4. I do not have a strong opinion either, but what about
> > > > "onBatchUpdated" ?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy 
> > > > wrote:
> > > > > >
> > > > > > Sohpie—
> > > > > >
> > > > > > Thank you very much for such a detailed review of the KIP. It might
> > > > > > actually be 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-15 Thread Colt McNealy
Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang 
wrote:

> Thanks for the summary, that looks good to me.
>
> Guozhang
>
> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy  wrote:
> >
> > Hello there!
> >
> > Thanks everyone for the comments. There's a lot of back-and-forth going
> on,
> > so I'll do my best to summarize what everyone's said in TLDR format:
> >
> > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
> similarly
> > for the other methods.
> > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> > 3. Remove the `earliestOffset` parameter for performance reasons.
> >
> > If that's all fine with everyone, I'll update the KIP and we—well, mostly
> > Edu (:  —will open a PR.
> >
> > Cheers,
> > Colt McNealy
> >
> > *Founder, LittleHorse.dev*
> >
> >
> > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
> > wrote:
> >
> > > Hello everyone,
> > >
> > > Thanks for all your feedback for this KIP!
> > >
> > > I think that the key to choosing proper names for this API is
> understanding
> > > the terms used inside the StoreChangelogReader. Currently, this class
> has
> > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my
> opinion,
> > > using StandbyUpdateListener for the interface fits better on these
> terms.
> > > Same applies for onUpdateStart/Suspended.
> > >
> > > StoreChangelogReader uses "the same mechanism" for active task
> restoration
> > > and standby task updates, but this is an implementation detail. Under
> > > normal circumstances (no rebalances or task migrations), the changelog
> > > reader will be in STANDBY_UPDATING, which means it will be updating
> standby
> > > tasks as long as there are new records in the changelog topic. That's
> why I
> > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't
> 100%
> > > align with StateRestoreListener, but either one is fine.
> > >
> > > Edu
> > >
> > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> guozhang.wang...@gmail.com>
> > > wrote:
> > >
> > > > Hello Colt,
> > > >
> > > > Thanks for writing the KIP! I have read through the updated KIP and
> > > > overall it looks great. I only have minor naming comments (well,
> > > > aren't naming the least boring stuff to discuss and that takes the
> > > > most of the time for KIPs :P):
> > > >
> > > > 1. I tend to agree with Sophie regarding whether or not to include
> > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > > > it is also more consistent with the functions of
> > > > "StateRestoreListener" where we do not name it as
> > > > "onStateRestoreState" etc.
> > > >
> > > > 2. I know in community discussions we sometimes say "a standby is
> > > > promoted to active", but in the official code / java docs we did not
> > > > have a term of "promotion", since what the code does is really
> recycle
> > > > the task (while keeping its state stores open), and create a new
> > > > active task that takes in the recycled state stores and just changing
> > > > the other fields like task type etc. After thinking about this for a
> > > > bit, I tend to feel that "promoted" is indeed a better name for user
> > > > facing purposes while "recycle" is more of a technical detail inside
> > > > the code and could be abstracted away from users. So I feel keeping
> > > > the name "PROMOTED" is fine.
> > > >
> > > > 3. Regarding "earliestOffset", it does feel like we cannot always
> > > > avoid another call to the Kafka API. And on the other hand, I also
> > > > tend to think that such bookkeeping may be better done at the app
> > > > level than from the Streams' public API level. I.e. the app could
> keep
> > > > a "first ever starting offset" per "topic-partition-store" key, and a
> > > > when we have rolling restart and hence some standby task keeps
> > > > "jumping" from one client to another via task assignment, the app
> > > > would update this value just one when it finds the
> > > > ""topic-partition-store" was never triggered before. What do you
> > > > think?
> > > >
> > > > 4. I do not have a strong opinion either, but what about
> > > "onBatchUpdated" ?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy 
> > > wrote:
> > > > >
> > > > > Sohpie—
> > > > >
> > > > > Thank you very much for such a detailed review of the KIP. It might
> > > > > actually be longer than the original KIP in the first place!
> > > > >
> > > > > 1. Ack'ed and fixed.
> > > > >
> > > > > 2. Correct, this is a confusing passage and requires context:
> > > > >
> > > > > One thing on our list of TODO's regarding reliability is to
> determine
> > > how
> > > > > to configure `session.timeout.ms`. In our Kubernetes Environment,
> an
> > > > > instance of our Streams App can be terminated, restarted, and get
> back
> > > > into
> > > > > the "RUNNING" Streams state in about 20 seconds. We have two
> 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-14 Thread Guozhang Wang
Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy  wrote:
>
> Hello there!
>
> Thanks everyone for the comments. There's a lot of back-and-forth going on,
> so I'll do my best to summarize what everyone's said in TLDR format:
>
> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do similarly
> for the other methods.
> 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> 3. Remove the `earliestOffset` parameter for performance reasons.
>
> If that's all fine with everyone, I'll update the KIP and we—well, mostly
> Edu (:  —will open a PR.
>
> Cheers,
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
> wrote:
>
> > Hello everyone,
> >
> > Thanks for all your feedback for this KIP!
> >
> > I think that the key to choosing proper names for this API is understanding
> > the terms used inside the StoreChangelogReader. Currently, this class has
> > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion,
> > using StandbyUpdateListener for the interface fits better on these terms.
> > Same applies for onUpdateStart/Suspended.
> >
> > StoreChangelogReader uses "the same mechanism" for active task restoration
> > and standby task updates, but this is an implementation detail. Under
> > normal circumstances (no rebalances or task migrations), the changelog
> > reader will be in STANDBY_UPDATING, which means it will be updating standby
> > tasks as long as there are new records in the changelog topic. That's why I
> > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100%
> > align with StateRestoreListener, but either one is fine.
> >
> > Edu
> >
> > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang 
> > wrote:
> >
> > > Hello Colt,
> > >
> > > Thanks for writing the KIP! I have read through the updated KIP and
> > > overall it looks great. I only have minor naming comments (well,
> > > aren't naming the least boring stuff to discuss and that takes the
> > > most of the time for KIPs :P):
> > >
> > > 1. I tend to agree with Sophie regarding whether or not to include
> > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > > it is also more consistent with the functions of
> > > "StateRestoreListener" where we do not name it as
> > > "onStateRestoreState" etc.
> > >
> > > 2. I know in community discussions we sometimes say "a standby is
> > > promoted to active", but in the official code / java docs we did not
> > > have a term of "promotion", since what the code does is really recycle
> > > the task (while keeping its state stores open), and create a new
> > > active task that takes in the recycled state stores and just changing
> > > the other fields like task type etc. After thinking about this for a
> > > bit, I tend to feel that "promoted" is indeed a better name for user
> > > facing purposes while "recycle" is more of a technical detail inside
> > > the code and could be abstracted away from users. So I feel keeping
> > > the name "PROMOTED" is fine.
> > >
> > > 3. Regarding "earliestOffset", it does feel like we cannot always
> > > avoid another call to the Kafka API. And on the other hand, I also
> > > tend to think that such bookkeeping may be better done at the app
> > > level than from the Streams' public API level. I.e. the app could keep
> > > a "first ever starting offset" per "topic-partition-store" key, and a
> > > when we have rolling restart and hence some standby task keeps
> > > "jumping" from one client to another via task assignment, the app
> > > would update this value just one when it finds the
> > > ""topic-partition-store" was never triggered before. What do you
> > > think?
> > >
> > > 4. I do not have a strong opinion either, but what about
> > "onBatchUpdated" ?
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy 
> > wrote:
> > > >
> > > > Sohpie—
> > > >
> > > > Thank you very much for such a detailed review of the KIP. It might
> > > > actually be longer than the original KIP in the first place!
> > > >
> > > > 1. Ack'ed and fixed.
> > > >
> > > > 2. Correct, this is a confusing passage and requires context:
> > > >
> > > > One thing on our list of TODO's regarding reliability is to determine
> > how
> > > > to configure `session.timeout.ms`. In our Kubernetes Environment, an
> > > > instance of our Streams App can be terminated, restarted, and get back
> > > into
> > > > the "RUNNING" Streams state in about 20 seconds. We have two options
> > > here:
> > > > a) set session.timeout.ms to 30 seconds or so, and deal with 20
> > seconds
> > > of
> > > > unavailability for affected partitions, but avoid shuffling Tasks; or
> > b)
> > > > set session.timeout.ms to a low value, such as 6 seconds (
> > > > heartbeat.interval.ms of 2000), and reduce the unavailability window
> > > during
> > > > a rolling bounce but incur an "extra" rebalance. There are several
> > > > 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-13 Thread Colt McNealy
Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth going on,
so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do similarly
for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well, mostly
Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
wrote:

> Hello everyone,
>
> Thanks for all your feedback for this KIP!
>
> I think that the key to choosing proper names for this API is understanding
> the terms used inside the StoreChangelogReader. Currently, this class has
> two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion,
> using StandbyUpdateListener for the interface fits better on these terms.
> Same applies for onUpdateStart/Suspended.
>
> StoreChangelogReader uses "the same mechanism" for active task restoration
> and standby task updates, but this is an implementation detail. Under
> normal circumstances (no rebalances or task migrations), the changelog
> reader will be in STANDBY_UPDATING, which means it will be updating standby
> tasks as long as there are new records in the changelog topic. That's why I
> prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100%
> align with StateRestoreListener, but either one is fine.
>
> Edu
>
> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang 
> wrote:
>
> > Hello Colt,
> >
> > Thanks for writing the KIP! I have read through the updated KIP and
> > overall it looks great. I only have minor naming comments (well,
> > aren't naming the least boring stuff to discuss and that takes the
> > most of the time for KIPs :P):
> >
> > 1. I tend to agree with Sophie regarding whether or not to include
> > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > it is also more consistent with the functions of
> > "StateRestoreListener" where we do not name it as
> > "onStateRestoreState" etc.
> >
> > 2. I know in community discussions we sometimes say "a standby is
> > promoted to active", but in the official code / java docs we did not
> > have a term of "promotion", since what the code does is really recycle
> > the task (while keeping its state stores open), and create a new
> > active task that takes in the recycled state stores and just changing
> > the other fields like task type etc. After thinking about this for a
> > bit, I tend to feel that "promoted" is indeed a better name for user
> > facing purposes while "recycle" is more of a technical detail inside
> > the code and could be abstracted away from users. So I feel keeping
> > the name "PROMOTED" is fine.
> >
> > 3. Regarding "earliestOffset", it does feel like we cannot always
> > avoid another call to the Kafka API. And on the other hand, I also
> > tend to think that such bookkeeping may be better done at the app
> > level than from the Streams' public API level. I.e. the app could keep
> > a "first ever starting offset" per "topic-partition-store" key, and a
> > when we have rolling restart and hence some standby task keeps
> > "jumping" from one client to another via task assignment, the app
> > would update this value just one when it finds the
> > ""topic-partition-store" was never triggered before. What do you
> > think?
> >
> > 4. I do not have a strong opinion either, but what about
> "onBatchUpdated" ?
> >
> >
> > Guozhang
> >
> > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy 
> wrote:
> > >
> > > Sohpie—
> > >
> > > Thank you very much for such a detailed review of the KIP. It might
> > > actually be longer than the original KIP in the first place!
> > >
> > > 1. Ack'ed and fixed.
> > >
> > > 2. Correct, this is a confusing passage and requires context:
> > >
> > > One thing on our list of TODO's regarding reliability is to determine
> how
> > > to configure `session.timeout.ms`. In our Kubernetes Environment, an
> > > instance of our Streams App can be terminated, restarted, and get back
> > into
> > > the "RUNNING" Streams state in about 20 seconds. We have two options
> > here:
> > > a) set session.timeout.ms to 30 seconds or so, and deal with 20
> seconds
> > of
> > > unavailability for affected partitions, but avoid shuffling Tasks; or
> b)
> > > set session.timeout.ms to a low value, such as 6 seconds (
> > > heartbeat.interval.ms of 2000), and reduce the unavailability window
> > during
> > > a rolling bounce but incur an "extra" rebalance. There are several
> > > different costs to a rebalance, including the shuffling of standby
> tasks.
> > > JMX metrics are not fine-grained enough to give us an accurate picture
> of
> > > what's going on with the whole Standby Task Shuffle Dance. I
> hypothesize
> > > that the Standby Update Listener might help us clarify just how the
> > > shuffling actually (not 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-13 Thread Eduwer Camacaro
Hello everyone,

Thanks for all your feedback for this KIP!

I think that the key to choosing proper names for this API is understanding
the terms used inside the StoreChangelogReader. Currently, this class has
two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion,
using StandbyUpdateListener for the interface fits better on these terms.
Same applies for onUpdateStart/Suspended.

StoreChangelogReader uses "the same mechanism" for active task restoration
and standby task updates, but this is an implementation detail. Under
normal circumstances (no rebalances or task migrations), the changelog
reader will be in STANDBY_UPDATING, which means it will be updating standby
tasks as long as there are new records in the changelog topic. That's why I
prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100%
align with StateRestoreListener, but either one is fine.

Edu

On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang 
wrote:

> Hello Colt,
>
> Thanks for writing the KIP! I have read through the updated KIP and
> overall it looks great. I only have minor naming comments (well,
> aren't naming the least boring stuff to discuss and that takes the
> most of the time for KIPs :P):
>
> 1. I tend to agree with Sophie regarding whether or not to include
> "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> it is also more consistent with the functions of
> "StateRestoreListener" where we do not name it as
> "onStateRestoreState" etc.
>
> 2. I know in community discussions we sometimes say "a standby is
> promoted to active", but in the official code / java docs we did not
> have a term of "promotion", since what the code does is really recycle
> the task (while keeping its state stores open), and create a new
> active task that takes in the recycled state stores and just changing
> the other fields like task type etc. After thinking about this for a
> bit, I tend to feel that "promoted" is indeed a better name for user
> facing purposes while "recycle" is more of a technical detail inside
> the code and could be abstracted away from users. So I feel keeping
> the name "PROMOTED" is fine.
>
> 3. Regarding "earliestOffset", it does feel like we cannot always
> avoid another call to the Kafka API. And on the other hand, I also
> tend to think that such bookkeeping may be better done at the app
> level than from the Streams' public API level. I.e. the app could keep
> a "first ever starting offset" per "topic-partition-store" key, and a
> when we have rolling restart and hence some standby task keeps
> "jumping" from one client to another via task assignment, the app
> would update this value just one when it finds the
> ""topic-partition-store" was never triggered before. What do you
> think?
>
> 4. I do not have a strong opinion either, but what about "onBatchUpdated" ?
>
>
> Guozhang
>
> On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy  wrote:
> >
> > Sohpie—
> >
> > Thank you very much for such a detailed review of the KIP. It might
> > actually be longer than the original KIP in the first place!
> >
> > 1. Ack'ed and fixed.
> >
> > 2. Correct, this is a confusing passage and requires context:
> >
> > One thing on our list of TODO's regarding reliability is to determine how
> > to configure `session.timeout.ms`. In our Kubernetes Environment, an
> > instance of our Streams App can be terminated, restarted, and get back
> into
> > the "RUNNING" Streams state in about 20 seconds. We have two options
> here:
> > a) set session.timeout.ms to 30 seconds or so, and deal with 20 seconds
> of
> > unavailability for affected partitions, but avoid shuffling Tasks; or b)
> > set session.timeout.ms to a low value, such as 6 seconds (
> > heartbeat.interval.ms of 2000), and reduce the unavailability window
> during
> > a rolling bounce but incur an "extra" rebalance. There are several
> > different costs to a rebalance, including the shuffling of standby tasks.
> > JMX metrics are not fine-grained enough to give us an accurate picture of
> > what's going on with the whole Standby Task Shuffle Dance. I hypothesize
> > that the Standby Update Listener might help us clarify just how the
> > shuffling actually (not theoretically) works, which will help us make a
> > more informed decision about the session timeout config.
> >
> > If you think this is worth putting in the KIP, I'll polish it and do so;
> > else, I'll remove the current half-baked explanation.
> >
> > 3. Overall, I agree with this. In our app, each Task has only one Store
> to
> > reduce the number of changelog partitions, so I sometimes forget the
> > distinction between the two concepts, as reflected in the KIP (:
> >
> > 3a. I don't like the word "Restore" here, since Restoration refers to an
> > Active Task getting caught up in preparation to resume processing.
> > `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a
> > native Python speaker so I do prefer shorter names anyways (:
> >
> > 3b1. +1 to 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-13 Thread Guozhang Wang
Hello Colt,

Thanks for writing the KIP! I have read through the updated KIP and
overall it looks great. I only have minor naming comments (well,
aren't naming the least boring stuff to discuss and that takes the
most of the time for KIPs :P):

1. I tend to agree with Sophie regarding whether or not to include
"Standby" in the functions of "onStandbyUpdateStart/Suspended", since
it is also more consistent with the functions of
"StateRestoreListener" where we do not name it as
"onStateRestoreState" etc.

2. I know in community discussions we sometimes say "a standby is
promoted to active", but in the official code / java docs we did not
have a term of "promotion", since what the code does is really recycle
the task (while keeping its state stores open), and create a new
active task that takes in the recycled state stores and just changing
the other fields like task type etc. After thinking about this for a
bit, I tend to feel that "promoted" is indeed a better name for user
facing purposes while "recycle" is more of a technical detail inside
the code and could be abstracted away from users. So I feel keeping
the name "PROMOTED" is fine.

3. Regarding "earliestOffset", it does feel like we cannot always
avoid another call to the Kafka API. And on the other hand, I also
tend to think that such bookkeeping may be better done at the app
level than from the Streams' public API level. I.e. the app could keep
a "first ever starting offset" per "topic-partition-store" key, and a
when we have rolling restart and hence some standby task keeps
"jumping" from one client to another via task assignment, the app
would update this value just one when it finds the
""topic-partition-store" was never triggered before. What do you
think?

4. I do not have a strong opinion either, but what about "onBatchUpdated" ?


Guozhang

On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy  wrote:
>
> Sohpie—
>
> Thank you very much for such a detailed review of the KIP. It might
> actually be longer than the original KIP in the first place!
>
> 1. Ack'ed and fixed.
>
> 2. Correct, this is a confusing passage and requires context:
>
> One thing on our list of TODO's regarding reliability is to determine how
> to configure `session.timeout.ms`. In our Kubernetes Environment, an
> instance of our Streams App can be terminated, restarted, and get back into
> the "RUNNING" Streams state in about 20 seconds. We have two options here:
> a) set session.timeout.ms to 30 seconds or so, and deal with 20 seconds of
> unavailability for affected partitions, but avoid shuffling Tasks; or b)
> set session.timeout.ms to a low value, such as 6 seconds (
> heartbeat.interval.ms of 2000), and reduce the unavailability window during
> a rolling bounce but incur an "extra" rebalance. There are several
> different costs to a rebalance, including the shuffling of standby tasks.
> JMX metrics are not fine-grained enough to give us an accurate picture of
> what's going on with the whole Standby Task Shuffle Dance. I hypothesize
> that the Standby Update Listener might help us clarify just how the
> shuffling actually (not theoretically) works, which will help us make a
> more informed decision about the session timeout config.
>
> If you think this is worth putting in the KIP, I'll polish it and do so;
> else, I'll remove the current half-baked explanation.
>
> 3. Overall, I agree with this. In our app, each Task has only one Store to
> reduce the number of changelog partitions, so I sometimes forget the
> distinction between the two concepts, as reflected in the KIP (:
>
> 3a. I don't like the word "Restore" here, since Restoration refers to an
> Active Task getting caught up in preparation to resume processing.
> `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a
> native Python speaker so I do prefer shorter names anyways (:
>
> 3b1. +1 to removing the word 'Task'.
>
> 3b2. I like `onUpdateStart()`, but with your permission I'd prefer
> `onStandbyUpdateStart()` which matches the name of the Interface
> "StandbyUpdateListener". (the python part of me hates this, however)
>
> 3b3. Going back to question 2), `earliestOffset` was intended to allow us
> to more easily calculate the amount of state _already loaded_ in the store
> by subtracting (startingOffset - earliestOffset). This would help us see
> how much inefficiency is introduced in a rolling restart—if we end up going
> from a situation with an up-to-date standby before the restart, and then
> after the whole restart, the Task is shuffled onto an instance where there
> is no previous state, then that is expensive. However, if the final
> shuffling results in the Task back on an instance with a lot of pre-built
> state, it's not expensive.
>
> If a call over the network is required to determine the earliestOffset,
> then this is a "hard no-go" for me, and we will remove it (I'll have to
> check with Eduwer as he is close to having a working implementation). I
> think we can probably determine 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-11 Thread Colt McNealy
Sohpie—

Thank you very much for such a detailed review of the KIP. It might
actually be longer than the original KIP in the first place!

1. Ack'ed and fixed.

2. Correct, this is a confusing passage and requires context:

One thing on our list of TODO's regarding reliability is to determine how
to configure `session.timeout.ms`. In our Kubernetes Environment, an
instance of our Streams App can be terminated, restarted, and get back into
the "RUNNING" Streams state in about 20 seconds. We have two options here:
a) set session.timeout.ms to 30 seconds or so, and deal with 20 seconds of
unavailability for affected partitions, but avoid shuffling Tasks; or b)
set session.timeout.ms to a low value, such as 6 seconds (
heartbeat.interval.ms of 2000), and reduce the unavailability window during
a rolling bounce but incur an "extra" rebalance. There are several
different costs to a rebalance, including the shuffling of standby tasks.
JMX metrics are not fine-grained enough to give us an accurate picture of
what's going on with the whole Standby Task Shuffle Dance. I hypothesize
that the Standby Update Listener might help us clarify just how the
shuffling actually (not theoretically) works, which will help us make a
more informed decision about the session timeout config.

If you think this is worth putting in the KIP, I'll polish it and do so;
else, I'll remove the current half-baked explanation.

3. Overall, I agree with this. In our app, each Task has only one Store to
reduce the number of changelog partitions, so I sometimes forget the
distinction between the two concepts, as reflected in the KIP (:

3a. I don't like the word "Restore" here, since Restoration refers to an
Active Task getting caught up in preparation to resume processing.
`StandbyUpdateListener` is fine by me; I have updated the KIP. I am a
native Python speaker so I do prefer shorter names anyways (:

3b1. +1 to removing the word 'Task'.

3b2. I like `onUpdateStart()`, but with your permission I'd prefer
`onStandbyUpdateStart()` which matches the name of the Interface
"StandbyUpdateListener". (the python part of me hates this, however)

3b3. Going back to question 2), `earliestOffset` was intended to allow us
to more easily calculate the amount of state _already loaded_ in the store
by subtracting (startingOffset - earliestOffset). This would help us see
how much inefficiency is introduced in a rolling restart—if we end up going
from a situation with an up-to-date standby before the restart, and then
after the whole restart, the Task is shuffled onto an instance where there
is no previous state, then that is expensive. However, if the final
shuffling results in the Task back on an instance with a lot of pre-built
state, it's not expensive.

If a call over the network is required to determine the earliestOffset,
then this is a "hard no-go" for me, and we will remove it (I'll have to
check with Eduwer as he is close to having a working implementation). I
think we can probably determine what we wanted to see in a different
way, but it will take more thinking.. If `earliestOffset` is confusing,
perhaps rename it to `earliestChangelogOffset`?

`startingOffset` is easy to remove as it can be determined from the first
call to `onBatch{Restored/Updated/Processed/Loaded}()`.

Anyways, I've updated the JavaDoc in the interface; hopefully it's more
clear. Awaiting further instructions here.

3c. Good point; after thinking, my preference is `onBatchLoaded()`  ->
`onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`. I am
less fond of "processed" because when I was first learning Streams I
mistakenly thought that standby tasks actually processed the input topic
rather than loaded from the changelog. I'll defer to you here.

3d. +1 to `onUpdateSuspended()`, or better yet
`onStandbyUpdateSuspended()`. Will check about the implementation of
keeping track of the number of records loaded.

4a. I think this might be best in a separate KIP, especially given that
this is my and Eduwer's first time contributing to Kafka (so we want to
minimize the blast radius).

4b. I might respectfully (and timidly) push back here, RECYCLED for an
Active Task is a bit confusing to me. DEMOTED and MIGRATED make sense from
the standpoint of an Active Task, recycling to me sounds like throwing
stuff away, such that the resources (i.e. disk space) can be used by a
separate Task. As an alternative rather than trying to reuse the same enum,
maybe rename it to `StandbySuspendReason` to avoid naming conflicts with
`ActiveSuspendReason`? However, I could be convinced to rename PROMOTED ->
RECYCLED, especially if Eduwer agrees.

TLDR:

T1. Agreed, will remove the word "Task" as it's incorrect.
T2. Will update to `onStandbyUpdateStart()`
T3. Awaiting further instructions on earliestOffset and startingOffset.
T4. I don't like `onBatchProcessed()` too much, perhaps `onBatchLoaded()`?
T5. Will update to `onStandbyUpdateSuspended()`
T6. Thoughts on renaming SuspendReason to 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-11 Thread Sophie Blee-Goldman
Hey Colt! Thanks for the KIP -- this will be a great addition to Streams, I
can't believe we've gone so long without this.

Overall the proposal makes sense, but I had a handful of fairly minor
questions and suggestions/requests

1. Seems like the last sentence in the 2nd paragraph of the Motivation
section is cut off and incomplete -- "want to be able to know " what
exactly?

2. This isn't that important since the motivation as a whole is clear to me
and convincing enough, but I'm not quite sure I understand the example at
the end of the Motivation section. How are standby tasks (and the ability
to hook into and monitor their status) related to the session.timeout.ms
config?

3. To help both old and new users of Kafka Streams understand this new
restore listener and its purpose/semantics, can we try to name the class and
 callbacks in a way that's more consistent with the active task restore
listener?

3a. StandbyTaskUpdateListener:
The existing restore listener is called StateRestoreListener, so the new
one could be called something like StandbyStateRestoreListener. Although
we typically refer to standby tasks as "processing" rather than "restoring"
records -- ie restoration is a term for active task state specifically. I
actually
like the original suggestion if we just drop the "Task" part of the name,
ie StandbyUpdateListener. I think either that or StandbyRestoreListener
would be fine and probably the two best options.
Also, this probably goes without saying but any change to the name of this
class should of course be reflected in the KafkaStreams#setXXX API as well

3b. #onTaskCreated
 I know the "start" callback feels a bit different for the standby task
updater vs an active task beginning restoration, but I think we should try
to
keep the various callbacks aligned to their active restore listener
counterpart. We can/should just replace the term "restore" with "update"
for the
callback method names the same way we do for the class name, which in this
case would give us #onUpdateStart. Personally I like this better,
but it's ultimately up to you. However, I would push back against anything
that includes the word "Task" (eg #onTaskCreated) as the listener
 is actually not scoped to the task itself but instead to the individual
state store(s). This is the main reason I would prefer calling it something
like #onUpdateStart, which keeps the focus on the store being updated
rather than the task that just happens to own this store
One last thing on this callback -- do we really need both the
`earliestOffset` and `startingOffset`? I feel like this might be more
confusing than it
is helpful (tbh even I'm not completely sure I know what the earliestOffset
is supposed to represent) More importantly, is this all information
that is already available and able to be passed in to the callback by
Streams? I haven't checked on this but it feels like the earliestOffset is
likely to require a remote call, either by the embedded consumer or via the
admin client. If so, the ROI on including this parameter seems
quite low (if not outright negative)

3c. #onBatchRestored
If we opt to use the term "update" in place of "restore" elsewhere, then we
should consider doing so here as well. What do you think about
#onBatchUpdated, or even #onBatchProcessed?
I'm actually not super concerned about this particular API, and honestly I
think we can use restore or update interchangeably here, so if you
 don't like any of the suggested names (and no one can think of anything
better), I would just stick with #onBatchRestored. In this case,
it kind of makes the most sense.

3d. #onTaskSuspended
Along the same lines as 3b above, #onUpdateSuspended or just
#onRestoreSuspended probably makes more sense for this callback. Also,
 I notice the StateRestoreListener passes in the total number of records
restored to its #onRestoreSuspended. Assuming we already track
that information in Streams and have it readily available to pass in at
whatever point we would be invoking this callback, that might be a
useful  parameter for the standby listener to have as well

4. I totally love the SuspendReason thing, just two notes/requests:

4a. Feel free to push back against adding onto the scope of this KIP, but
it would be great to expand the active state restore listener with this
SuspendReason enum as well. It would be really useful for both variants of
restore listener

4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for standby
tasks it means basically the same thing, the point is that active
tasks can also be recycled into standbys through the same mechanism. This
way they can share the SuspendReason enum -- not that it's
necessary for them to share, I just think it would be a good idea to keep
the two restore listeners aligned to the highest degree possible for as
we can.
I was actually considering proposing a short KIP with a new
RecyclingListener (or something) specifically for this exact kind of thing,
since we
currently have