Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Thanks for clarifying. Makes sense to me. On 11/30/23 8:33 PM, Colt McNealy wrote: Hi Matthias and everyone— Some clarification is necessary just for posterity. It turns out that, on a fresh standby task before we start polling for records, we wouldn't be able to get the current end offset without a network call. This leaves us three options: A) Make it an Optional or use a sentinel value to mark that it's not present. B) Perform a network call to get the endOffset when it's not there. C) Remove it. Option A) seems like it could be a confusing API, especially because in the strong majority of cases, the Optional would be empty. Option B) is undesirable because of the performance considerations—if we're going to make a network round trip, we might as well get some records back! That leaves us with option C), which is the least-bad of all of them. At LittleHorse we actually do care about the endOffset in the onUpdateStart() method, and having it would be useful to us. However, the work-around isn't horrible, because the endOffset will be passed into the first call to onBatchLoaded() , which normally follows onUpdateStart() within <100ms. Thanks, Colt McNealy *Founder, LittleHorse.dev* On Thu, Nov 30, 2023 at 4:43 PM Matthias J. Sax wrote: parameter is somewhat irrelevant to our use case Sounds like a weird justification to change the KIP. Providing more information is usually better than less, so it seems it won't hurt to just keep it (seems useful in general to get the current end offset in this callback) -- you can always ignore it, if it's not relevant for your use case. -Matthias On 11/30/23 6:56 AM, Eduwer Camacaro wrote: Hello everyone, We have come to the conclusion, during our work on this KIP's implementation, that the #onUpdateStart callback's "currentEndOffset" parameter is somewhat irrelevant to our use case. When this callback is invoked, I think this value is usually unknown. Our choice to delete this parameter from the #onUpdateStart callback requires an update to the KIP. Please feel free to review the PR and provide any comments you may have. :) Thanks in advance Edu- On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax wrote: Thanks. SGTM. On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote: That all sounds good to me! Thanks for the KIP On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy wrote: Hi Sophie, Matthias, Bruno, and Eduwer— Thanks for your patience as I have been scrambling to catch up after a week of business travel (and a few days with no time to code). I'd like to tie up some loose ends here, but in short, I don't think the KIP document itself needs any changes (our internal implementation does, however). 1. In the interest of a) not changing the KIP after it's already out for a vote, and b) making sure our English grammar is "correct", let's stick with 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. 2. For me (and, thankfully, the community as well) adding a remote network call at any point in this KIP is a non-starter. We'll ensure that our implementation does not introduce one. 3. I really don't like changing API behavior, even if it's not documented in the javadoc. As such, I am strongly against modifying the behavior of endOffsets() on the consumer as some people may implicitly depend on the contract. 3a. The Consumer#currentLag() method gives us exactly what we want without a network call (current lag from a cache, from which we can compute the offset). 4. I have no opinion about whether we should pass endOffset or currentLag to the callback. Either one has the same exact information inside it. In the interest of not changing the KIP after the vote has started, I'll leave it as endOffset. As such, I believe the KIP doesn't need any updates, nor has it been updated since the vote started. Would anyone else like to discuss something before the Otter Council adjourns regarding this matter? Cheers, Colt McNealy *Founder, LittleHorse.dev* On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman < sop...@responsive.dev> wrote: Just want to checkpoint the current state of this KIP and make sure we're on track to get it in to 3.7 (we still have a few weeks) -- looks like there are two remaining open questions, both relating to the middle/intermediate callback: 1. What to name it: seems like the primary candidates are onBatchLoaded and onBatchUpdated (and maybe also onStandbyUpdated?) 2. What additional information can we pass in that would strike a good balance between being helpful and impacting performance. Regarding #1, I think all of the current options are reasonable enough that we should just let Colt decide which he prefers. I personally think #onBatchUpdated is fine -- Bruno does make a fair point but the truth is that English grammar can be sticky and while it could be argued that it is the store which is updated, not the batch, I feel that it is perfectly clear what
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hi Matthias and everyone— Some clarification is necessary just for posterity. It turns out that, on a fresh standby task before we start polling for records, we wouldn't be able to get the current end offset without a network call. This leaves us three options: A) Make it an Optional or use a sentinel value to mark that it's not present. B) Perform a network call to get the endOffset when it's not there. C) Remove it. Option A) seems like it could be a confusing API, especially because in the strong majority of cases, the Optional would be empty. Option B) is undesirable because of the performance considerations—if we're going to make a network round trip, we might as well get some records back! That leaves us with option C), which is the least-bad of all of them. At LittleHorse we actually do care about the endOffset in the onUpdateStart() method, and having it would be useful to us. However, the work-around isn't horrible, because the endOffset will be passed into the first call to onBatchLoaded() , which normally follows onUpdateStart() within <100ms. Thanks, Colt McNealy *Founder, LittleHorse.dev* On Thu, Nov 30, 2023 at 4:43 PM Matthias J. Sax wrote: > > parameter is somewhat irrelevant to our use case > > Sounds like a weird justification to change the KIP. Providing more > information is usually better than less, so it seems it won't hurt to > just keep it (seems useful in general to get the current end offset in > this callback) -- you can always ignore it, if it's not relevant for > your use case. > > > -Matthias > > > On 11/30/23 6:56 AM, Eduwer Camacaro wrote: > > Hello everyone, > > > > We have come to the conclusion, during our work on this KIP's > > implementation, that the #onUpdateStart callback's "currentEndOffset" > > parameter is somewhat irrelevant to our use case. When this callback is > > invoked, I think this value is usually unknown. Our choice to delete this > > parameter from the #onUpdateStart callback requires an update to the KIP. > > > > Please feel free to review the PR and provide any comments you may have. > :) > > Thanks in advance > > > > Edu- > > > > On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax > wrote: > > > >> Thanks. SGTM. > >> > >> On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote: > >>> That all sounds good to me! Thanks for the KIP > >>> > >>> On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy > >> wrote: > >>> > Hi Sophie, Matthias, Bruno, and Eduwer— > > Thanks for your patience as I have been scrambling to catch up after a > >> week > of business travel (and a few days with no time to code). I'd like to > >> tie > up some loose ends here, but in short, I don't think the KIP document > itself needs any changes (our internal implementation does, however). > > 1. In the interest of a) not changing the KIP after it's already out > >> for a > vote, and b) making sure our English grammar is "correct", let's stick > >> with > 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. > > 2. For me (and, thankfully, the community as well) adding a remote > >> network > call at any point in this KIP is a non-starter. We'll ensure that > our implementation does not introduce one. > > 3. I really don't like changing API behavior, even if it's not > >> documented > in the javadoc. As such, I am strongly against modifying the behavior > of > endOffsets() on the consumer as some people may implicitly depend on > the > contract. > 3a. The Consumer#currentLag() method gives us exactly what we want > >> without > a network call (current lag from a cache, from which we can compute > the > offset). > > 4. I have no opinion about whether we should pass endOffset or > >> currentLag > to the callback. Either one has the same exact information inside it. > In > the interest of not changing the KIP after the vote has started, I'll > >> leave > it as endOffset. > > As such, I believe the KIP doesn't need any updates, nor has it been > updated since the vote started. > > Would anyone else like to discuss something before the Otter Council > adjourns regarding this matter? > > Cheers, > Colt McNealy > > *Founder, LittleHorse.dev* > > > On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman < > sop...@responsive.dev> > wrote: > > > Just want to checkpoint the current state of this KIP and make sure > >> we're > > on track to get it in to 3.7 (we still have a few weeks) -- looks > like > > there are two remaining open questions, both relating to the > > middle/intermediate callback: > > > > 1. What to name it: seems like the primary candidates are > onBatchLoaded > and > > onBatchUpdated (and maybe also onStandbyUpdated?) > > 2. What additional information can we pass in that would strike a > good > > balance between
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
parameter is somewhat irrelevant to our use case Sounds like a weird justification to change the KIP. Providing more information is usually better than less, so it seems it won't hurt to just keep it (seems useful in general to get the current end offset in this callback) -- you can always ignore it, if it's not relevant for your use case. -Matthias On 11/30/23 6:56 AM, Eduwer Camacaro wrote: Hello everyone, We have come to the conclusion, during our work on this KIP's implementation, that the #onUpdateStart callback's "currentEndOffset" parameter is somewhat irrelevant to our use case. When this callback is invoked, I think this value is usually unknown. Our choice to delete this parameter from the #onUpdateStart callback requires an update to the KIP. Please feel free to review the PR and provide any comments you may have. :) Thanks in advance Edu- On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax wrote: Thanks. SGTM. On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote: That all sounds good to me! Thanks for the KIP On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy wrote: Hi Sophie, Matthias, Bruno, and Eduwer— Thanks for your patience as I have been scrambling to catch up after a week of business travel (and a few days with no time to code). I'd like to tie up some loose ends here, but in short, I don't think the KIP document itself needs any changes (our internal implementation does, however). 1. In the interest of a) not changing the KIP after it's already out for a vote, and b) making sure our English grammar is "correct", let's stick with 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. 2. For me (and, thankfully, the community as well) adding a remote network call at any point in this KIP is a non-starter. We'll ensure that our implementation does not introduce one. 3. I really don't like changing API behavior, even if it's not documented in the javadoc. As such, I am strongly against modifying the behavior of endOffsets() on the consumer as some people may implicitly depend on the contract. 3a. The Consumer#currentLag() method gives us exactly what we want without a network call (current lag from a cache, from which we can compute the offset). 4. I have no opinion about whether we should pass endOffset or currentLag to the callback. Either one has the same exact information inside it. In the interest of not changing the KIP after the vote has started, I'll leave it as endOffset. As such, I believe the KIP doesn't need any updates, nor has it been updated since the vote started. Would anyone else like to discuss something before the Otter Council adjourns regarding this matter? Cheers, Colt McNealy *Founder, LittleHorse.dev* On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman < sop...@responsive.dev> wrote: Just want to checkpoint the current state of this KIP and make sure we're on track to get it in to 3.7 (we still have a few weeks) -- looks like there are two remaining open questions, both relating to the middle/intermediate callback: 1. What to name it: seems like the primary candidates are onBatchLoaded and onBatchUpdated (and maybe also onStandbyUpdated?) 2. What additional information can we pass in that would strike a good balance between being helpful and impacting performance. Regarding #1, I think all of the current options are reasonable enough that we should just let Colt decide which he prefers. I personally think #onBatchUpdated is fine -- Bruno does make a fair point but the truth is that English grammar can be sticky and while it could be argued that it is the store which is updated, not the batch, I feel that it is perfectly clear what is meant by "onBatchUpdated" and to me, this doesn't sound weird at all. That's just my two cents in case it helps, but again, whatever makes sense to you Colt is fine When it comes to #2 -- as much as I would love to dig into the Consumer client lore and see if we can modify existing APIs or add new ones in order to get the desired offset metadata in an efficient way, I think we're starting to go down a rabbit hole that is going to expand the scope way beyond what Colt thought he was signing up for. I would advocate to focus on just the basic feature for now and drop the end-offset from the callback. Once we have a standby listener it will be easy to expand on with a followup KIP if/when we find an efficient way to add additional useful information. I think it will also become more clear what is and isn't useful after more people get to using it in the real world Colt/Eduwer: how necessary is receiving the end offset during a batch update to your own application use case? Also, for those who really do need to check the current end offset, I believe in theory you should be able to use the KafkaStreams#metrics API to get the current lag and/or end offset for the changelog -- it's possible this does not represent the most up-to-date end offset (I'm
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hello everyone, We have come to the conclusion, during our work on this KIP's implementation, that the #onUpdateStart callback's "currentEndOffset" parameter is somewhat irrelevant to our use case. When this callback is invoked, I think this value is usually unknown. Our choice to delete this parameter from the #onUpdateStart callback requires an update to the KIP. Please feel free to review the PR and provide any comments you may have. :) Thanks in advance Edu- On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax wrote: > Thanks. SGTM. > > On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote: > > That all sounds good to me! Thanks for the KIP > > > > On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy > wrote: > > > >> Hi Sophie, Matthias, Bruno, and Eduwer— > >> > >> Thanks for your patience as I have been scrambling to catch up after a > week > >> of business travel (and a few days with no time to code). I'd like to > tie > >> up some loose ends here, but in short, I don't think the KIP document > >> itself needs any changes (our internal implementation does, however). > >> > >> 1. In the interest of a) not changing the KIP after it's already out > for a > >> vote, and b) making sure our English grammar is "correct", let's stick > with > >> 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. > >> > >> 2. For me (and, thankfully, the community as well) adding a remote > network > >> call at any point in this KIP is a non-starter. We'll ensure that > >> our implementation does not introduce one. > >> > >> 3. I really don't like changing API behavior, even if it's not > documented > >> in the javadoc. As such, I am strongly against modifying the behavior of > >> endOffsets() on the consumer as some people may implicitly depend on the > >> contract. > >> 3a. The Consumer#currentLag() method gives us exactly what we want > without > >> a network call (current lag from a cache, from which we can compute the > >> offset). > >> > >> 4. I have no opinion about whether we should pass endOffset or > currentLag > >> to the callback. Either one has the same exact information inside it. In > >> the interest of not changing the KIP after the vote has started, I'll > leave > >> it as endOffset. > >> > >> As such, I believe the KIP doesn't need any updates, nor has it been > >> updated since the vote started. > >> > >> Would anyone else like to discuss something before the Otter Council > >> adjourns regarding this matter? > >> > >> Cheers, > >> Colt McNealy > >> > >> *Founder, LittleHorse.dev* > >> > >> > >> On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman < > >> sop...@responsive.dev> > >> wrote: > >> > >>> Just want to checkpoint the current state of this KIP and make sure > we're > >>> on track to get it in to 3.7 (we still have a few weeks) -- looks like > >>> there are two remaining open questions, both relating to the > >>> middle/intermediate callback: > >>> > >>> 1. What to name it: seems like the primary candidates are onBatchLoaded > >> and > >>> onBatchUpdated (and maybe also onStandbyUpdated?) > >>> 2. What additional information can we pass in that would strike a good > >>> balance between being helpful and impacting performance. > >>> > >>> Regarding #1, I think all of the current options are reasonable enough > >> that > >>> we should just let Colt decide which he prefers. I personally think > >>> #onBatchUpdated is fine -- Bruno does make a fair point but the truth > is > >>> that English grammar can be sticky and while it could be argued that it > >> is > >>> the store which is updated, not the batch, I feel that it is perfectly > >>> clear what is meant by "onBatchUpdated" and to me, this doesn't sound > >> weird > >>> at all. That's just my two cents in case it helps, but again, whatever > >>> makes sense to you Colt is fine > >>> > >>> When it comes to #2 -- as much as I would love to dig into the Consumer > >>> client lore and see if we can modify existing APIs or add new ones in > >> order > >>> to get the desired offset metadata in an efficient way, I think we're > >>> starting to go down a rabbit hole that is going to expand the scope way > >>> beyond what Colt thought he was signing up for. I would advocate to > focus > >>> on just the basic feature for now and drop the end-offset from the > >>> callback. Once we have a standby listener it will be easy to expand on > >> with > >>> a followup KIP if/when we find an efficient way to add additional > useful > >>> information. I think it will also become more clear what is and isn't > >>> useful after more people get to using it in the real world > >>> > >>> Colt/Eduwer: how necessary is receiving the end offset during a batch > >>> update to your own application use case? > >>> > >>> Also, for those who really do need to check the current end offset, I > >>> believe in theory you should be able to use the KafkaStreams#metrics > API > >> to > >>> get the current lag and/or end offset for the changelog -- it's > possible > >>> this does
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Thanks. SGTM. On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote: That all sounds good to me! Thanks for the KIP On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy wrote: Hi Sophie, Matthias, Bruno, and Eduwer— Thanks for your patience as I have been scrambling to catch up after a week of business travel (and a few days with no time to code). I'd like to tie up some loose ends here, but in short, I don't think the KIP document itself needs any changes (our internal implementation does, however). 1. In the interest of a) not changing the KIP after it's already out for a vote, and b) making sure our English grammar is "correct", let's stick with 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. 2. For me (and, thankfully, the community as well) adding a remote network call at any point in this KIP is a non-starter. We'll ensure that our implementation does not introduce one. 3. I really don't like changing API behavior, even if it's not documented in the javadoc. As such, I am strongly against modifying the behavior of endOffsets() on the consumer as some people may implicitly depend on the contract. 3a. The Consumer#currentLag() method gives us exactly what we want without a network call (current lag from a cache, from which we can compute the offset). 4. I have no opinion about whether we should pass endOffset or currentLag to the callback. Either one has the same exact information inside it. In the interest of not changing the KIP after the vote has started, I'll leave it as endOffset. As such, I believe the KIP doesn't need any updates, nor has it been updated since the vote started. Would anyone else like to discuss something before the Otter Council adjourns regarding this matter? Cheers, Colt McNealy *Founder, LittleHorse.dev* On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman < sop...@responsive.dev> wrote: Just want to checkpoint the current state of this KIP and make sure we're on track to get it in to 3.7 (we still have a few weeks) -- looks like there are two remaining open questions, both relating to the middle/intermediate callback: 1. What to name it: seems like the primary candidates are onBatchLoaded and onBatchUpdated (and maybe also onStandbyUpdated?) 2. What additional information can we pass in that would strike a good balance between being helpful and impacting performance. Regarding #1, I think all of the current options are reasonable enough that we should just let Colt decide which he prefers. I personally think #onBatchUpdated is fine -- Bruno does make a fair point but the truth is that English grammar can be sticky and while it could be argued that it is the store which is updated, not the batch, I feel that it is perfectly clear what is meant by "onBatchUpdated" and to me, this doesn't sound weird at all. That's just my two cents in case it helps, but again, whatever makes sense to you Colt is fine When it comes to #2 -- as much as I would love to dig into the Consumer client lore and see if we can modify existing APIs or add new ones in order to get the desired offset metadata in an efficient way, I think we're starting to go down a rabbit hole that is going to expand the scope way beyond what Colt thought he was signing up for. I would advocate to focus on just the basic feature for now and drop the end-offset from the callback. Once we have a standby listener it will be easy to expand on with a followup KIP if/when we find an efficient way to add additional useful information. I think it will also become more clear what is and isn't useful after more people get to using it in the real world Colt/Eduwer: how necessary is receiving the end offset during a batch update to your own application use case? Also, for those who really do need to check the current end offset, I believe in theory you should be able to use the KafkaStreams#metrics API to get the current lag and/or end offset for the changelog -- it's possible this does not represent the most up-to-date end offset (I'm not sure it does or does not), but it should be close enough to be reliable and useful for the purpose of monitoring -- I mean it is a metric, after all. Hope this helps -- in the end, it's up to you (Colt) to decide what you want to bring in scope or not. We still have more than 3 weeks until the KIP freeze as currently proposed, so in theory you could even implement this KIP without the end offset and then do a followup KIP to add the end offset within the same release, ie without any deprecations. There are plenty of paths forward here, so don't let us drag this out forever if you know what you want Cheers, Sophie On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax wrote: Forgot one thing: We could also pass `currentLag()` into `onBachLoaded()` instead of end-offset. -Matthias On 10/20/23 10:56 AM, Matthias J. Sax wrote: Thanks for digging into this Bruno. The JavaDoc on the consumer does not say anything specific about `endOffset`
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
That all sounds good to me! Thanks for the KIP On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy wrote: > Hi Sophie, Matthias, Bruno, and Eduwer— > > Thanks for your patience as I have been scrambling to catch up after a week > of business travel (and a few days with no time to code). I'd like to tie > up some loose ends here, but in short, I don't think the KIP document > itself needs any changes (our internal implementation does, however). > > 1. In the interest of a) not changing the KIP after it's already out for a > vote, and b) making sure our English grammar is "correct", let's stick with > 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. > > 2. For me (and, thankfully, the community as well) adding a remote network > call at any point in this KIP is a non-starter. We'll ensure that > our implementation does not introduce one. > > 3. I really don't like changing API behavior, even if it's not documented > in the javadoc. As such, I am strongly against modifying the behavior of > endOffsets() on the consumer as some people may implicitly depend on the > contract. > 3a. The Consumer#currentLag() method gives us exactly what we want without > a network call (current lag from a cache, from which we can compute the > offset). > > 4. I have no opinion about whether we should pass endOffset or currentLag > to the callback. Either one has the same exact information inside it. In > the interest of not changing the KIP after the vote has started, I'll leave > it as endOffset. > > As such, I believe the KIP doesn't need any updates, nor has it been > updated since the vote started. > > Would anyone else like to discuss something before the Otter Council > adjourns regarding this matter? > > Cheers, > Colt McNealy > > *Founder, LittleHorse.dev* > > > On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman < > sop...@responsive.dev> > wrote: > > > Just want to checkpoint the current state of this KIP and make sure we're > > on track to get it in to 3.7 (we still have a few weeks) -- looks like > > there are two remaining open questions, both relating to the > > middle/intermediate callback: > > > > 1. What to name it: seems like the primary candidates are onBatchLoaded > and > > onBatchUpdated (and maybe also onStandbyUpdated?) > > 2. What additional information can we pass in that would strike a good > > balance between being helpful and impacting performance. > > > > Regarding #1, I think all of the current options are reasonable enough > that > > we should just let Colt decide which he prefers. I personally think > > #onBatchUpdated is fine -- Bruno does make a fair point but the truth is > > that English grammar can be sticky and while it could be argued that it > is > > the store which is updated, not the batch, I feel that it is perfectly > > clear what is meant by "onBatchUpdated" and to me, this doesn't sound > weird > > at all. That's just my two cents in case it helps, but again, whatever > > makes sense to you Colt is fine > > > > When it comes to #2 -- as much as I would love to dig into the Consumer > > client lore and see if we can modify existing APIs or add new ones in > order > > to get the desired offset metadata in an efficient way, I think we're > > starting to go down a rabbit hole that is going to expand the scope way > > beyond what Colt thought he was signing up for. I would advocate to focus > > on just the basic feature for now and drop the end-offset from the > > callback. Once we have a standby listener it will be easy to expand on > with > > a followup KIP if/when we find an efficient way to add additional useful > > information. I think it will also become more clear what is and isn't > > useful after more people get to using it in the real world > > > > Colt/Eduwer: how necessary is receiving the end offset during a batch > > update to your own application use case? > > > > Also, for those who really do need to check the current end offset, I > > believe in theory you should be able to use the KafkaStreams#metrics API > to > > get the current lag and/or end offset for the changelog -- it's possible > > this does not represent the most up-to-date end offset (I'm not sure it > > does or does not), but it should be close enough to be reliable and > useful > > for the purpose of monitoring -- I mean it is a metric, after all. > > > > Hope this helps -- in the end, it's up to you (Colt) to decide what you > > want to bring in scope or not. We still have more than 3 weeks until the > > KIP freeze as currently proposed, so in theory you could even implement > > this KIP without the end offset and then do a followup KIP to add the end > > offset within the same release, ie without any deprecations. There are > > plenty of paths forward here, so don't let us drag this out forever if > you > > know what you want > > > > Cheers, > > Sophie > > > > On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax > wrote: > > > > > Forgot one thing: > > > > > > We could also pass
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hi Sophie, Matthias, Bruno, and Eduwer— Thanks for your patience as I have been scrambling to catch up after a week of business travel (and a few days with no time to code). I'd like to tie up some loose ends here, but in short, I don't think the KIP document itself needs any changes (our internal implementation does, however). 1. In the interest of a) not changing the KIP after it's already out for a vote, and b) making sure our English grammar is "correct", let's stick with 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. 2. For me (and, thankfully, the community as well) adding a remote network call at any point in this KIP is a non-starter. We'll ensure that our implementation does not introduce one. 3. I really don't like changing API behavior, even if it's not documented in the javadoc. As such, I am strongly against modifying the behavior of endOffsets() on the consumer as some people may implicitly depend on the contract. 3a. The Consumer#currentLag() method gives us exactly what we want without a network call (current lag from a cache, from which we can compute the offset). 4. I have no opinion about whether we should pass endOffset or currentLag to the callback. Either one has the same exact information inside it. In the interest of not changing the KIP after the vote has started, I'll leave it as endOffset. As such, I believe the KIP doesn't need any updates, nor has it been updated since the vote started. Would anyone else like to discuss something before the Otter Council adjourns regarding this matter? Cheers, Colt McNealy *Founder, LittleHorse.dev* On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman wrote: > Just want to checkpoint the current state of this KIP and make sure we're > on track to get it in to 3.7 (we still have a few weeks) -- looks like > there are two remaining open questions, both relating to the > middle/intermediate callback: > > 1. What to name it: seems like the primary candidates are onBatchLoaded and > onBatchUpdated (and maybe also onStandbyUpdated?) > 2. What additional information can we pass in that would strike a good > balance between being helpful and impacting performance. > > Regarding #1, I think all of the current options are reasonable enough that > we should just let Colt decide which he prefers. I personally think > #onBatchUpdated is fine -- Bruno does make a fair point but the truth is > that English grammar can be sticky and while it could be argued that it is > the store which is updated, not the batch, I feel that it is perfectly > clear what is meant by "onBatchUpdated" and to me, this doesn't sound weird > at all. That's just my two cents in case it helps, but again, whatever > makes sense to you Colt is fine > > When it comes to #2 -- as much as I would love to dig into the Consumer > client lore and see if we can modify existing APIs or add new ones in order > to get the desired offset metadata in an efficient way, I think we're > starting to go down a rabbit hole that is going to expand the scope way > beyond what Colt thought he was signing up for. I would advocate to focus > on just the basic feature for now and drop the end-offset from the > callback. Once we have a standby listener it will be easy to expand on with > a followup KIP if/when we find an efficient way to add additional useful > information. I think it will also become more clear what is and isn't > useful after more people get to using it in the real world > > Colt/Eduwer: how necessary is receiving the end offset during a batch > update to your own application use case? > > Also, for those who really do need to check the current end offset, I > believe in theory you should be able to use the KafkaStreams#metrics API to > get the current lag and/or end offset for the changelog -- it's possible > this does not represent the most up-to-date end offset (I'm not sure it > does or does not), but it should be close enough to be reliable and useful > for the purpose of monitoring -- I mean it is a metric, after all. > > Hope this helps -- in the end, it's up to you (Colt) to decide what you > want to bring in scope or not. We still have more than 3 weeks until the > KIP freeze as currently proposed, so in theory you could even implement > this KIP without the end offset and then do a followup KIP to add the end > offset within the same release, ie without any deprecations. There are > plenty of paths forward here, so don't let us drag this out forever if you > know what you want > > Cheers, > Sophie > > On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax wrote: > > > Forgot one thing: > > > > We could also pass `currentLag()` into `onBachLoaded()` instead of > > end-offset. > > > > > > -Matthias > > > > On 10/20/23 10:56 AM, Matthias J. Sax wrote: > > > Thanks for digging into this Bruno. > > > > > > The JavaDoc on the consumer does not say anything specific about > > > `endOffset` guarantees: > > > > > >> Get the end offsets for the given partitions.
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Just want to checkpoint the current state of this KIP and make sure we're on track to get it in to 3.7 (we still have a few weeks) -- looks like there are two remaining open questions, both relating to the middle/intermediate callback: 1. What to name it: seems like the primary candidates are onBatchLoaded and onBatchUpdated (and maybe also onStandbyUpdated?) 2. What additional information can we pass in that would strike a good balance between being helpful and impacting performance. Regarding #1, I think all of the current options are reasonable enough that we should just let Colt decide which he prefers. I personally think #onBatchUpdated is fine -- Bruno does make a fair point but the truth is that English grammar can be sticky and while it could be argued that it is the store which is updated, not the batch, I feel that it is perfectly clear what is meant by "onBatchUpdated" and to me, this doesn't sound weird at all. That's just my two cents in case it helps, but again, whatever makes sense to you Colt is fine When it comes to #2 -- as much as I would love to dig into the Consumer client lore and see if we can modify existing APIs or add new ones in order to get the desired offset metadata in an efficient way, I think we're starting to go down a rabbit hole that is going to expand the scope way beyond what Colt thought he was signing up for. I would advocate to focus on just the basic feature for now and drop the end-offset from the callback. Once we have a standby listener it will be easy to expand on with a followup KIP if/when we find an efficient way to add additional useful information. I think it will also become more clear what is and isn't useful after more people get to using it in the real world Colt/Eduwer: how necessary is receiving the end offset during a batch update to your own application use case? Also, for those who really do need to check the current end offset, I believe in theory you should be able to use the KafkaStreams#metrics API to get the current lag and/or end offset for the changelog -- it's possible this does not represent the most up-to-date end offset (I'm not sure it does or does not), but it should be close enough to be reliable and useful for the purpose of monitoring -- I mean it is a metric, after all. Hope this helps -- in the end, it's up to you (Colt) to decide what you want to bring in scope or not. We still have more than 3 weeks until the KIP freeze as currently proposed, so in theory you could even implement this KIP without the end offset and then do a followup KIP to add the end offset within the same release, ie without any deprecations. There are plenty of paths forward here, so don't let us drag this out forever if you know what you want Cheers, Sophie On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax wrote: > Forgot one thing: > > We could also pass `currentLag()` into `onBachLoaded()` instead of > end-offset. > > > -Matthias > > On 10/20/23 10:56 AM, Matthias J. Sax wrote: > > Thanks for digging into this Bruno. > > > > The JavaDoc on the consumer does not say anything specific about > > `endOffset` guarantees: > > > >> Get the end offsets for the given partitions. In the default {@code > >> read_uncommitted} isolation level, the end > >> offset is the high watermark (that is, the offset of the last > >> successfully replicated message plus one). For > >> {@code read_committed} consumers, the end offset is the last stable > >> offset (LSO), which is the minimum of > >> the high watermark and the smallest offset of any open transaction. > >> Finally, if the partition has never been > >> written to, the end offset is 0. > > > > Thus, I actually believe that it would be ok to change the > > implementation and serve the answer from the `TopicPartitionState`? > > > > Another idea would be, to use `currentLag()` in combination with > > `position()` (or the offset of the last read record) to compute the > > end-offset of the fly? > > > > > > -Matthias > > > > On 10/20/23 4:00 AM, Bruno Cadonna wrote: > >> Hi, > >> > >> Matthias is correct that the end offsets are stored somewhere in the > >> metadata of the consumer. More precisely, they are stored in the > >> `TopicPartitionState`. However, I could not find public API on the > >> consumer other than currentLag() that uses the stored end offsets. If > >> I understand the code correctly, method endOffSets() always triggers a > >> remote call. > >> > >> I am a bit concerned about doing remote calls every commit.interval.ms > >> (by default 200ms under EOS). At the moment the remote calls are only > >> issued if an optimization for KTables is turned on where changelog > >> topics are replaced with the input topic of the KTable. The current > >> remote calls retrieve all committed offsets of the group at once. If I > >> understand correctly, that is one single remote call. Remote calls for > >> getting end offsets of changelog topics -- as I understand you are > >> planning to issue -- will probably
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Forgot one thing: We could also pass `currentLag()` into `onBachLoaded()` instead of end-offset. -Matthias On 10/20/23 10:56 AM, Matthias J. Sax wrote: Thanks for digging into this Bruno. The JavaDoc on the consumer does not say anything specific about `endOffset` guarantees: Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been written to, the end offset is 0. Thus, I actually believe that it would be ok to change the implementation and serve the answer from the `TopicPartitionState`? Another idea would be, to use `currentLag()` in combination with `position()` (or the offset of the last read record) to compute the end-offset of the fly? -Matthias On 10/20/23 4:00 AM, Bruno Cadonna wrote: Hi, Matthias is correct that the end offsets are stored somewhere in the metadata of the consumer. More precisely, they are stored in the `TopicPartitionState`. However, I could not find public API on the consumer other than currentLag() that uses the stored end offsets. If I understand the code correctly, method endOffSets() always triggers a remote call. I am a bit concerned about doing remote calls every commit.interval.ms (by default 200ms under EOS). At the moment the remote calls are only issued if an optimization for KTables is turned on where changelog topics are replaced with the input topic of the KTable. The current remote calls retrieve all committed offsets of the group at once. If I understand correctly, that is one single remote call. Remote calls for getting end offsets of changelog topics -- as I understand you are planning to issue -- will probably result in multiple remote calls to multiple leaders of the changelog topic partitions. Please correct me if I misunderstood anything of the above. If my understanding is correct, I propose to modify the consumer in such a way to get the end offset from the locally stored metadata whenever possible as part of the implementation of this KIP. I do not know what the implications are of such a change of the consumer and if a KIP is needed for it. Maybe, endOffsets() guarantees to return the freshest end offsets possible, which would not be satisfied with the modification. Regarding the naming, I do not completely agree with Matthias. While the pattern might be consistent with onBatchUpdated, what is the meaning of onBatchUpdated? Is the batch updated? The names onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO. With "restore" the pattern works better. If I restore a batch of records in a state, the records are not there although they should be there and I add them. If I update a batch of records in a state. This sounds like the batch of records is in the state and I modify the existing records within the state. That is clearly not the meaning of the event for which the listener should be called. Best, Bruno On 10/19/23 2:12 AM, Matthias J. Sax wrote: Thanks for the KIP. Seems I am almost late to the party. About naming (fun, fun, fun): I like the current proposal overall, except `onBachLoaded`, but would prefer `onBatchUpdated`. It better aligns to everything else: - it's an update-listener, not loaded-listener - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, `onRestoreSuspended, and `onBachRestored` (it's very consistent - `StandbyUpdateListener` should have `onUpdateStart`, `onUpdateSuspended` and `onBatchUpdated` to be equally consistent (using "loaded" breaks the pattern) About the end-offset question: I am relatively sure that the consumer gets the latest end-offset as attached metadata in every fetch response. (We exploit this behavior to track end-offsets for input topic with regard to `max.task.idle.ms` without overhead -- it was also a concern when we did the corresponding KIP how we could track lag with no overhead). Thus, I believe we would "just" need to modify the code accordingly to get this information from the restore-consumer (`restorConsumer.endOffsets(...)`; should be served w/o RPC but from internal metadata cache) for free, and pass into the listener. Please double check / verify this claim and keep me honest about it. -Matthias On 10/17/23 6:38 AM, Eduwer Camacaro wrote: Hi Bruno, Thanks for your observation; surely it will require a network call using the admin client in order to know this "endOffset" and that will have an impact on performance. We can either find a solution that has a low impact on performance or ideally zero impact; unfortunately, I don't see a way to have zero impact on performance. However, we can leverage
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Thanks for digging into this Bruno. The JavaDoc on the consumer does not say anything specific about `endOffset` guarantees: Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been written to, the end offset is 0. Thus, I actually believe that it would be ok to change the implementation and serve the answer from the `TopicPartitionState`? Another idea would be, to use `currentLag()` in combination with `position()` (or the offset of the last read record) to compute the end-offset of the fly? -Matthias On 10/20/23 4:00 AM, Bruno Cadonna wrote: Hi, Matthias is correct that the end offsets are stored somewhere in the metadata of the consumer. More precisely, they are stored in the `TopicPartitionState`. However, I could not find public API on the consumer other than currentLag() that uses the stored end offsets. If I understand the code correctly, method endOffSets() always triggers a remote call. I am a bit concerned about doing remote calls every commit.interval.ms (by default 200ms under EOS). At the moment the remote calls are only issued if an optimization for KTables is turned on where changelog topics are replaced with the input topic of the KTable. The current remote calls retrieve all committed offsets of the group at once. If I understand correctly, that is one single remote call. Remote calls for getting end offsets of changelog topics -- as I understand you are planning to issue -- will probably result in multiple remote calls to multiple leaders of the changelog topic partitions. Please correct me if I misunderstood anything of the above. If my understanding is correct, I propose to modify the consumer in such a way to get the end offset from the locally stored metadata whenever possible as part of the implementation of this KIP. I do not know what the implications are of such a change of the consumer and if a KIP is needed for it. Maybe, endOffsets() guarantees to return the freshest end offsets possible, which would not be satisfied with the modification. Regarding the naming, I do not completely agree with Matthias. While the pattern might be consistent with onBatchUpdated, what is the meaning of onBatchUpdated? Is the batch updated? The names onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO. With "restore" the pattern works better. If I restore a batch of records in a state, the records are not there although they should be there and I add them. If I update a batch of records in a state. This sounds like the batch of records is in the state and I modify the existing records within the state. That is clearly not the meaning of the event for which the listener should be called. Best, Bruno On 10/19/23 2:12 AM, Matthias J. Sax wrote: Thanks for the KIP. Seems I am almost late to the party. About naming (fun, fun, fun): I like the current proposal overall, except `onBachLoaded`, but would prefer `onBatchUpdated`. It better aligns to everything else: - it's an update-listener, not loaded-listener - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, `onRestoreSuspended, and `onBachRestored` (it's very consistent - `StandbyUpdateListener` should have `onUpdateStart`, `onUpdateSuspended` and `onBatchUpdated` to be equally consistent (using "loaded" breaks the pattern) About the end-offset question: I am relatively sure that the consumer gets the latest end-offset as attached metadata in every fetch response. (We exploit this behavior to track end-offsets for input topic with regard to `max.task.idle.ms` without overhead -- it was also a concern when we did the corresponding KIP how we could track lag with no overhead). Thus, I believe we would "just" need to modify the code accordingly to get this information from the restore-consumer (`restorConsumer.endOffsets(...)`; should be served w/o RPC but from internal metadata cache) for free, and pass into the listener. Please double check / verify this claim and keep me honest about it. -Matthias On 10/17/23 6:38 AM, Eduwer Camacaro wrote: Hi Bruno, Thanks for your observation; surely it will require a network call using the admin client in order to know this "endOffset" and that will have an impact on performance. We can either find a solution that has a low impact on performance or ideally zero impact; unfortunately, I don't see a way to have zero impact on performance. However, we can leverage the existing #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin client to ask for these "endOffset"s. As far I can understand, this update
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hi, Matthias is correct that the end offsets are stored somewhere in the metadata of the consumer. More precisely, they are stored in the `TopicPartitionState`. However, I could not find public API on the consumer other than currentLag() that uses the stored end offsets. If I understand the code correctly, method endOffSets() always triggers a remote call. I am a bit concerned about doing remote calls every commit.interval.ms (by default 200ms under EOS). At the moment the remote calls are only issued if an optimization for KTables is turned on where changelog topics are replaced with the input topic of the KTable. The current remote calls retrieve all committed offsets of the group at once. If I understand correctly, that is one single remote call. Remote calls for getting end offsets of changelog topics -- as I understand you are planning to issue -- will probably result in multiple remote calls to multiple leaders of the changelog topic partitions. Please correct me if I misunderstood anything of the above. If my understanding is correct, I propose to modify the consumer in such a way to get the end offset from the locally stored metadata whenever possible as part of the implementation of this KIP. I do not know what the implications are of such a change of the consumer and if a KIP is needed for it. Maybe, endOffsets() guarantees to return the freshest end offsets possible, which would not be satisfied with the modification. Regarding the naming, I do not completely agree with Matthias. While the pattern might be consistent with onBatchUpdated, what is the meaning of onBatchUpdated? Is the batch updated? The names onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO. With "restore" the pattern works better. If I restore a batch of records in a state, the records are not there although they should be there and I add them. If I update a batch of records in a state. This sounds like the batch of records is in the state and I modify the existing records within the state. That is clearly not the meaning of the event for which the listener should be called. Best, Bruno On 10/19/23 2:12 AM, Matthias J. Sax wrote: Thanks for the KIP. Seems I am almost late to the party. About naming (fun, fun, fun): I like the current proposal overall, except `onBachLoaded`, but would prefer `onBatchUpdated`. It better aligns to everything else: - it's an update-listener, not loaded-listener - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, `onRestoreSuspended, and `onBachRestored` (it's very consistent - `StandbyUpdateListener` should have `onUpdateStart`, `onUpdateSuspended` and `onBatchUpdated` to be equally consistent (using "loaded" breaks the pattern) About the end-offset question: I am relatively sure that the consumer gets the latest end-offset as attached metadata in every fetch response. (We exploit this behavior to track end-offsets for input topic with regard to `max.task.idle.ms` without overhead -- it was also a concern when we did the corresponding KIP how we could track lag with no overhead). Thus, I believe we would "just" need to modify the code accordingly to get this information from the restore-consumer (`restorConsumer.endOffsets(...)`; should be served w/o RPC but from internal metadata cache) for free, and pass into the listener. Please double check / verify this claim and keep me honest about it. -Matthias On 10/17/23 6:38 AM, Eduwer Camacaro wrote: Hi Bruno, Thanks for your observation; surely it will require a network call using the admin client in order to know this "endOffset" and that will have an impact on performance. We can either find a solution that has a low impact on performance or ideally zero impact; unfortunately, I don't see a way to have zero impact on performance. However, we can leverage the existing #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin client to ask for these "endOffset"s. As far I can understand, this update is done periodically using the "commit.interval.ms" configuration. I believe this option will force us to invoke StandbyUpdateLister once this interval is reached. On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna wrote: Thanks for the KIP, Colt and Eduwer, Are you sure there is also not a significant performance impact for passing into the callback `currentEndOffset`? I am asking because the comment here: https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129 says that the end-offset is only updated once for standby tasks whose changelog topic is not piggy-backed on input topics. I could also not find the update of end-offset for those standbys. Best, Bruno On 10/16/23 10:55 AM, Lucas Brutschy wrote: Hi all, it's a nice improvement! I don't have anything to add on top of the previous comments, just came
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Thanks for the KIP. Seems I am almost late to the party. About naming (fun, fun, fun): I like the current proposal overall, except `onBachLoaded`, but would prefer `onBatchUpdated`. It better aligns to everything else: - it's an update-listener, not loaded-listener - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, `onRestoreSuspended, and `onBachRestored` (it's very consistent - `StandbyUpdateListener` should have `onUpdateStart`, `onUpdateSuspended` and `onBatchUpdated` to be equally consistent (using "loaded" breaks the pattern) About the end-offset question: I am relatively sure that the consumer gets the latest end-offset as attached metadata in every fetch response. (We exploit this behavior to track end-offsets for input topic with regard to `max.task.idle.ms` without overhead -- it was also a concern when we did the corresponding KIP how we could track lag with no overhead). Thus, I believe we would "just" need to modify the code accordingly to get this information from the restore-consumer (`restorConsumer.endOffsets(...)`; should be served w/o RPC but from internal metadata cache) for free, and pass into the listener. Please double check / verify this claim and keep me honest about it. -Matthias On 10/17/23 6:38 AM, Eduwer Camacaro wrote: Hi Bruno, Thanks for your observation; surely it will require a network call using the admin client in order to know this "endOffset" and that will have an impact on performance. We can either find a solution that has a low impact on performance or ideally zero impact; unfortunately, I don't see a way to have zero impact on performance. However, we can leverage the existing #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin client to ask for these "endOffset"s. As far I can understand, this update is done periodically using the "commit.interval.ms" configuration. I believe this option will force us to invoke StandbyUpdateLister once this interval is reached. On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna wrote: Thanks for the KIP, Colt and Eduwer, Are you sure there is also not a significant performance impact for passing into the callback `currentEndOffset`? I am asking because the comment here: https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129 says that the end-offset is only updated once for standby tasks whose changelog topic is not piggy-backed on input topics. I could also not find the update of end-offset for those standbys. Best, Bruno On 10/16/23 10:55 AM, Lucas Brutschy wrote: Hi all, it's a nice improvement! I don't have anything to add on top of the previous comments, just came here to say that it seems to me consensus has been reached and the result looks good to me. Thanks Colt and Eduwer! Lucas On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy wrote: Thanks, Guozhang. I've updated the KIP and will start a vote. Colt McNealy *Founder, LittleHorse.dev* On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang < guozhang.wang...@gmail.com> wrote: Thanks for the summary, that looks good to me. Guozhang On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy wrote: Hello there! Thanks everyone for the comments. There's a lot of back-and-forth going on, so I'll do my best to summarize what everyone's said in TLDR format: 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do similarly for the other methods. 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. 3. Remove the `earliestOffset` parameter for performance reasons. If that's all fine with everyone, I'll update the KIP and we—well, mostly Edu (: —will open a PR. Cheers, Colt McNealy *Founder, LittleHorse.dev* On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro < edu...@littlehorse.io> wrote: Hello everyone, Thanks for all your feedback for this KIP! I think that the key to choosing proper names for this API is understanding the terms used inside the StoreChangelogReader. Currently, this class has two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion, using StandbyUpdateListener for the interface fits better on these terms. Same applies for onUpdateStart/Suspended. StoreChangelogReader uses "the same mechanism" for active task restoration and standby task updates, but this is an implementation detail. Under normal circumstances (no rebalances or task migrations), the changelog reader will be in STANDBY_UPDATING, which means it will be updating standby tasks as long as there are new records in the changelog topic. That's why I prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100% align with StateRestoreListener, but either one is fine. Edu On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < guozhang.wang...@gmail.com> wrote: Hello Colt, Thanks for writing the KIP! I have read through the updated KIP and
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hi Bruno, Thanks for your observation; surely it will require a network call using the admin client in order to know this "endOffset" and that will have an impact on performance. We can either find a solution that has a low impact on performance or ideally zero impact; unfortunately, I don't see a way to have zero impact on performance. However, we can leverage the existing #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin client to ask for these "endOffset"s. As far I can understand, this update is done periodically using the "commit.interval.ms" configuration. I believe this option will force us to invoke StandbyUpdateLister once this interval is reached. On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna wrote: > Thanks for the KIP, Colt and Eduwer, > > Are you sure there is also not a significant performance impact for > passing into the callback `currentEndOffset`? > > I am asking because the comment here: > > https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129 > > says that the end-offset is only updated once for standby tasks whose > changelog topic is not piggy-backed on input topics. I could also not > find the update of end-offset for those standbys. > > > Best, > Bruno > > On 10/16/23 10:55 AM, Lucas Brutschy wrote: > > Hi all, > > > > it's a nice improvement! I don't have anything to add on top of the > > previous comments, just came here to say that it seems to me consensus > > has been reached and the result looks good to me. > > > > Thanks Colt and Eduwer! > > Lucas > > > > On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy > wrote: > >> > >> Thanks, Guozhang. I've updated the KIP and will start a vote. > >> > >> Colt McNealy > >> > >> *Founder, LittleHorse.dev* > >> > >> > >> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang < > guozhang.wang...@gmail.com> > >> wrote: > >> > >>> Thanks for the summary, that looks good to me. > >>> > >>> Guozhang > >>> > >>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy > wrote: > > Hello there! > > Thanks everyone for the comments. There's a lot of back-and-forth > going > >>> on, > so I'll do my best to summarize what everyone's said in TLDR format: > > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do > >>> similarly > for the other methods. > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. > 3. Remove the `earliestOffset` parameter for performance reasons. > > If that's all fine with everyone, I'll update the KIP and we—well, > mostly > Edu (: —will open a PR. > > Cheers, > Colt McNealy > > *Founder, LittleHorse.dev* > > > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro < > edu...@littlehorse.io> > wrote: > > > Hello everyone, > > > > Thanks for all your feedback for this KIP! > > > > I think that the key to choosing proper names for this API is > >>> understanding > > the terms used inside the StoreChangelogReader. Currently, this class > >>> has > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my > >>> opinion, > > using StandbyUpdateListener for the interface fits better on these > >>> terms. > > Same applies for onUpdateStart/Suspended. > > > > StoreChangelogReader uses "the same mechanism" for active task > >>> restoration > > and standby task updates, but this is an implementation detail. Under > > normal circumstances (no rebalances or task migrations), the > changelog > > reader will be in STANDBY_UPDATING, which means it will be updating > >>> standby > > tasks as long as there are new records in the changelog topic. That's > >>> why I > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't > >>> 100% > > align with StateRestoreListener, but either one is fine. > > > > Edu > > > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < > >>> guozhang.wang...@gmail.com> > > wrote: > > > >> Hello Colt, > >> > >> Thanks for writing the KIP! I have read through the updated KIP and > >> overall it looks great. I only have minor naming comments (well, > >> aren't naming the least boring stuff to discuss and that takes the > >> most of the time for KIPs :P): > >> > >> 1. I tend to agree with Sophie regarding whether or not to include > >> "Standby" in the functions of "onStandbyUpdateStart/Suspended", > since > >> it is also more consistent with the functions of > >> "StateRestoreListener" where we do not name it as > >> "onStateRestoreState" etc. > >> > >> 2. I know in community discussions we sometimes say "a standby is > >> promoted to active", but in the official code / java docs we did not > >> have a term of "promotion", since what the code does is really > >>> recycle > >> the
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Thanks for the KIP, Colt and Eduwer, Are you sure there is also not a significant performance impact for passing into the callback `currentEndOffset`? I am asking because the comment here: https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129 says that the end-offset is only updated once for standby tasks whose changelog topic is not piggy-backed on input topics. I could also not find the update of end-offset for those standbys. Best, Bruno On 10/16/23 10:55 AM, Lucas Brutschy wrote: Hi all, it's a nice improvement! I don't have anything to add on top of the previous comments, just came here to say that it seems to me consensus has been reached and the result looks good to me. Thanks Colt and Eduwer! Lucas On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy wrote: Thanks, Guozhang. I've updated the KIP and will start a vote. Colt McNealy *Founder, LittleHorse.dev* On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang wrote: Thanks for the summary, that looks good to me. Guozhang On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy wrote: Hello there! Thanks everyone for the comments. There's a lot of back-and-forth going on, so I'll do my best to summarize what everyone's said in TLDR format: 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do similarly for the other methods. 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. 3. Remove the `earliestOffset` parameter for performance reasons. If that's all fine with everyone, I'll update the KIP and we—well, mostly Edu (: —will open a PR. Cheers, Colt McNealy *Founder, LittleHorse.dev* On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro wrote: Hello everyone, Thanks for all your feedback for this KIP! I think that the key to choosing proper names for this API is understanding the terms used inside the StoreChangelogReader. Currently, this class has two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion, using StandbyUpdateListener for the interface fits better on these terms. Same applies for onUpdateStart/Suspended. StoreChangelogReader uses "the same mechanism" for active task restoration and standby task updates, but this is an implementation detail. Under normal circumstances (no rebalances or task migrations), the changelog reader will be in STANDBY_UPDATING, which means it will be updating standby tasks as long as there are new records in the changelog topic. That's why I prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100% align with StateRestoreListener, but either one is fine. Edu On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < guozhang.wang...@gmail.com> wrote: Hello Colt, Thanks for writing the KIP! I have read through the updated KIP and overall it looks great. I only have minor naming comments (well, aren't naming the least boring stuff to discuss and that takes the most of the time for KIPs :P): 1. I tend to agree with Sophie regarding whether or not to include "Standby" in the functions of "onStandbyUpdateStart/Suspended", since it is also more consistent with the functions of "StateRestoreListener" where we do not name it as "onStateRestoreState" etc. 2. I know in community discussions we sometimes say "a standby is promoted to active", but in the official code / java docs we did not have a term of "promotion", since what the code does is really recycle the task (while keeping its state stores open), and create a new active task that takes in the recycled state stores and just changing the other fields like task type etc. After thinking about this for a bit, I tend to feel that "promoted" is indeed a better name for user facing purposes while "recycle" is more of a technical detail inside the code and could be abstracted away from users. So I feel keeping the name "PROMOTED" is fine. 3. Regarding "earliestOffset", it does feel like we cannot always avoid another call to the Kafka API. And on the other hand, I also tend to think that such bookkeeping may be better done at the app level than from the Streams' public API level. I.e. the app could keep a "first ever starting offset" per "topic-partition-store" key, and a when we have rolling restart and hence some standby task keeps "jumping" from one client to another via task assignment, the app would update this value just one when it finds the ""topic-partition-store" was never triggered before. What do you think? 4. I do not have a strong opinion either, but what about "onBatchUpdated" ? Guozhang On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy wrote: Sohpie— Thank you very much for such a detailed review of the KIP. It might actually be longer than the original KIP in the first place! 1. Ack'ed and fixed. 2. Correct, this is a confusing passage and requires context: One thing on our list of TODO's regarding reliability is to determine how
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hi all, it's a nice improvement! I don't have anything to add on top of the previous comments, just came here to say that it seems to me consensus has been reached and the result looks good to me. Thanks Colt and Eduwer! Lucas On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy wrote: > > Thanks, Guozhang. I've updated the KIP and will start a vote. > > Colt McNealy > > *Founder, LittleHorse.dev* > > > On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang > wrote: > > > Thanks for the summary, that looks good to me. > > > > Guozhang > > > > On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy wrote: > > > > > > Hello there! > > > > > > Thanks everyone for the comments. There's a lot of back-and-forth going > > on, > > > so I'll do my best to summarize what everyone's said in TLDR format: > > > > > > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do > > similarly > > > for the other methods. > > > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. > > > 3. Remove the `earliestOffset` parameter for performance reasons. > > > > > > If that's all fine with everyone, I'll update the KIP and we—well, mostly > > > Edu (: —will open a PR. > > > > > > Cheers, > > > Colt McNealy > > > > > > *Founder, LittleHorse.dev* > > > > > > > > > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro > > > wrote: > > > > > > > Hello everyone, > > > > > > > > Thanks for all your feedback for this KIP! > > > > > > > > I think that the key to choosing proper names for this API is > > understanding > > > > the terms used inside the StoreChangelogReader. Currently, this class > > has > > > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my > > opinion, > > > > using StandbyUpdateListener for the interface fits better on these > > terms. > > > > Same applies for onUpdateStart/Suspended. > > > > > > > > StoreChangelogReader uses "the same mechanism" for active task > > restoration > > > > and standby task updates, but this is an implementation detail. Under > > > > normal circumstances (no rebalances or task migrations), the changelog > > > > reader will be in STANDBY_UPDATING, which means it will be updating > > standby > > > > tasks as long as there are new records in the changelog topic. That's > > why I > > > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't > > 100% > > > > align with StateRestoreListener, but either one is fine. > > > > > > > > Edu > > > > > > > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < > > guozhang.wang...@gmail.com> > > > > wrote: > > > > > > > > > Hello Colt, > > > > > > > > > > Thanks for writing the KIP! I have read through the updated KIP and > > > > > overall it looks great. I only have minor naming comments (well, > > > > > aren't naming the least boring stuff to discuss and that takes the > > > > > most of the time for KIPs :P): > > > > > > > > > > 1. I tend to agree with Sophie regarding whether or not to include > > > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since > > > > > it is also more consistent with the functions of > > > > > "StateRestoreListener" where we do not name it as > > > > > "onStateRestoreState" etc. > > > > > > > > > > 2. I know in community discussions we sometimes say "a standby is > > > > > promoted to active", but in the official code / java docs we did not > > > > > have a term of "promotion", since what the code does is really > > recycle > > > > > the task (while keeping its state stores open), and create a new > > > > > active task that takes in the recycled state stores and just changing > > > > > the other fields like task type etc. After thinking about this for a > > > > > bit, I tend to feel that "promoted" is indeed a better name for user > > > > > facing purposes while "recycle" is more of a technical detail inside > > > > > the code and could be abstracted away from users. So I feel keeping > > > > > the name "PROMOTED" is fine. > > > > > > > > > > 3. Regarding "earliestOffset", it does feel like we cannot always > > > > > avoid another call to the Kafka API. And on the other hand, I also > > > > > tend to think that such bookkeeping may be better done at the app > > > > > level than from the Streams' public API level. I.e. the app could > > keep > > > > > a "first ever starting offset" per "topic-partition-store" key, and a > > > > > when we have rolling restart and hence some standby task keeps > > > > > "jumping" from one client to another via task assignment, the app > > > > > would update this value just one when it finds the > > > > > ""topic-partition-store" was never triggered before. What do you > > > > > think? > > > > > > > > > > 4. I do not have a strong opinion either, but what about > > > > "onBatchUpdated" ? > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy > > > > wrote: > > > > > > > > > > > > Sohpie— > > > > > > > > > > > > Thank you very much for such a detailed review of the KIP. It might > > > > > > actually be
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Thanks, Guozhang. I've updated the KIP and will start a vote. Colt McNealy *Founder, LittleHorse.dev* On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang wrote: > Thanks for the summary, that looks good to me. > > Guozhang > > On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy wrote: > > > > Hello there! > > > > Thanks everyone for the comments. There's a lot of back-and-forth going > on, > > so I'll do my best to summarize what everyone's said in TLDR format: > > > > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do > similarly > > for the other methods. > > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. > > 3. Remove the `earliestOffset` parameter for performance reasons. > > > > If that's all fine with everyone, I'll update the KIP and we—well, mostly > > Edu (: —will open a PR. > > > > Cheers, > > Colt McNealy > > > > *Founder, LittleHorse.dev* > > > > > > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro > > wrote: > > > > > Hello everyone, > > > > > > Thanks for all your feedback for this KIP! > > > > > > I think that the key to choosing proper names for this API is > understanding > > > the terms used inside the StoreChangelogReader. Currently, this class > has > > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my > opinion, > > > using StandbyUpdateListener for the interface fits better on these > terms. > > > Same applies for onUpdateStart/Suspended. > > > > > > StoreChangelogReader uses "the same mechanism" for active task > restoration > > > and standby task updates, but this is an implementation detail. Under > > > normal circumstances (no rebalances or task migrations), the changelog > > > reader will be in STANDBY_UPDATING, which means it will be updating > standby > > > tasks as long as there are new records in the changelog topic. That's > why I > > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't > 100% > > > align with StateRestoreListener, but either one is fine. > > > > > > Edu > > > > > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < > guozhang.wang...@gmail.com> > > > wrote: > > > > > > > Hello Colt, > > > > > > > > Thanks for writing the KIP! I have read through the updated KIP and > > > > overall it looks great. I only have minor naming comments (well, > > > > aren't naming the least boring stuff to discuss and that takes the > > > > most of the time for KIPs :P): > > > > > > > > 1. I tend to agree with Sophie regarding whether or not to include > > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since > > > > it is also more consistent with the functions of > > > > "StateRestoreListener" where we do not name it as > > > > "onStateRestoreState" etc. > > > > > > > > 2. I know in community discussions we sometimes say "a standby is > > > > promoted to active", but in the official code / java docs we did not > > > > have a term of "promotion", since what the code does is really > recycle > > > > the task (while keeping its state stores open), and create a new > > > > active task that takes in the recycled state stores and just changing > > > > the other fields like task type etc. After thinking about this for a > > > > bit, I tend to feel that "promoted" is indeed a better name for user > > > > facing purposes while "recycle" is more of a technical detail inside > > > > the code and could be abstracted away from users. So I feel keeping > > > > the name "PROMOTED" is fine. > > > > > > > > 3. Regarding "earliestOffset", it does feel like we cannot always > > > > avoid another call to the Kafka API. And on the other hand, I also > > > > tend to think that such bookkeeping may be better done at the app > > > > level than from the Streams' public API level. I.e. the app could > keep > > > > a "first ever starting offset" per "topic-partition-store" key, and a > > > > when we have rolling restart and hence some standby task keeps > > > > "jumping" from one client to another via task assignment, the app > > > > would update this value just one when it finds the > > > > ""topic-partition-store" was never triggered before. What do you > > > > think? > > > > > > > > 4. I do not have a strong opinion either, but what about > > > "onBatchUpdated" ? > > > > > > > > > > > > Guozhang > > > > > > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy > > > wrote: > > > > > > > > > > Sohpie— > > > > > > > > > > Thank you very much for such a detailed review of the KIP. It might > > > > > actually be longer than the original KIP in the first place! > > > > > > > > > > 1. Ack'ed and fixed. > > > > > > > > > > 2. Correct, this is a confusing passage and requires context: > > > > > > > > > > One thing on our list of TODO's regarding reliability is to > determine > > > how > > > > > to configure `session.timeout.ms`. In our Kubernetes Environment, > an > > > > > instance of our Streams App can be terminated, restarted, and get > back > > > > into > > > > > the "RUNNING" Streams state in about 20 seconds. We have two >
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Thanks for the summary, that looks good to me. Guozhang On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy wrote: > > Hello there! > > Thanks everyone for the comments. There's a lot of back-and-forth going on, > so I'll do my best to summarize what everyone's said in TLDR format: > > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do similarly > for the other methods. > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. > 3. Remove the `earliestOffset` parameter for performance reasons. > > If that's all fine with everyone, I'll update the KIP and we—well, mostly > Edu (: —will open a PR. > > Cheers, > Colt McNealy > > *Founder, LittleHorse.dev* > > > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro > wrote: > > > Hello everyone, > > > > Thanks for all your feedback for this KIP! > > > > I think that the key to choosing proper names for this API is understanding > > the terms used inside the StoreChangelogReader. Currently, this class has > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion, > > using StandbyUpdateListener for the interface fits better on these terms. > > Same applies for onUpdateStart/Suspended. > > > > StoreChangelogReader uses "the same mechanism" for active task restoration > > and standby task updates, but this is an implementation detail. Under > > normal circumstances (no rebalances or task migrations), the changelog > > reader will be in STANDBY_UPDATING, which means it will be updating standby > > tasks as long as there are new records in the changelog topic. That's why I > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100% > > align with StateRestoreListener, but either one is fine. > > > > Edu > > > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang > > wrote: > > > > > Hello Colt, > > > > > > Thanks for writing the KIP! I have read through the updated KIP and > > > overall it looks great. I only have minor naming comments (well, > > > aren't naming the least boring stuff to discuss and that takes the > > > most of the time for KIPs :P): > > > > > > 1. I tend to agree with Sophie regarding whether or not to include > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since > > > it is also more consistent with the functions of > > > "StateRestoreListener" where we do not name it as > > > "onStateRestoreState" etc. > > > > > > 2. I know in community discussions we sometimes say "a standby is > > > promoted to active", but in the official code / java docs we did not > > > have a term of "promotion", since what the code does is really recycle > > > the task (while keeping its state stores open), and create a new > > > active task that takes in the recycled state stores and just changing > > > the other fields like task type etc. After thinking about this for a > > > bit, I tend to feel that "promoted" is indeed a better name for user > > > facing purposes while "recycle" is more of a technical detail inside > > > the code and could be abstracted away from users. So I feel keeping > > > the name "PROMOTED" is fine. > > > > > > 3. Regarding "earliestOffset", it does feel like we cannot always > > > avoid another call to the Kafka API. And on the other hand, I also > > > tend to think that such bookkeeping may be better done at the app > > > level than from the Streams' public API level. I.e. the app could keep > > > a "first ever starting offset" per "topic-partition-store" key, and a > > > when we have rolling restart and hence some standby task keeps > > > "jumping" from one client to another via task assignment, the app > > > would update this value just one when it finds the > > > ""topic-partition-store" was never triggered before. What do you > > > think? > > > > > > 4. I do not have a strong opinion either, but what about > > "onBatchUpdated" ? > > > > > > > > > Guozhang > > > > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy > > wrote: > > > > > > > > Sohpie— > > > > > > > > Thank you very much for such a detailed review of the KIP. It might > > > > actually be longer than the original KIP in the first place! > > > > > > > > 1. Ack'ed and fixed. > > > > > > > > 2. Correct, this is a confusing passage and requires context: > > > > > > > > One thing on our list of TODO's regarding reliability is to determine > > how > > > > to configure `session.timeout.ms`. In our Kubernetes Environment, an > > > > instance of our Streams App can be terminated, restarted, and get back > > > into > > > > the "RUNNING" Streams state in about 20 seconds. We have two options > > > here: > > > > a) set session.timeout.ms to 30 seconds or so, and deal with 20 > > seconds > > > of > > > > unavailability for affected partitions, but avoid shuffling Tasks; or > > b) > > > > set session.timeout.ms to a low value, such as 6 seconds ( > > > > heartbeat.interval.ms of 2000), and reduce the unavailability window > > > during > > > > a rolling bounce but incur an "extra" rebalance. There are several > > > >
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hello there! Thanks everyone for the comments. There's a lot of back-and-forth going on, so I'll do my best to summarize what everyone's said in TLDR format: 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do similarly for the other methods. 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. 3. Remove the `earliestOffset` parameter for performance reasons. If that's all fine with everyone, I'll update the KIP and we—well, mostly Edu (: —will open a PR. Cheers, Colt McNealy *Founder, LittleHorse.dev* On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro wrote: > Hello everyone, > > Thanks for all your feedback for this KIP! > > I think that the key to choosing proper names for this API is understanding > the terms used inside the StoreChangelogReader. Currently, this class has > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion, > using StandbyUpdateListener for the interface fits better on these terms. > Same applies for onUpdateStart/Suspended. > > StoreChangelogReader uses "the same mechanism" for active task restoration > and standby task updates, but this is an implementation detail. Under > normal circumstances (no rebalances or task migrations), the changelog > reader will be in STANDBY_UPDATING, which means it will be updating standby > tasks as long as there are new records in the changelog topic. That's why I > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100% > align with StateRestoreListener, but either one is fine. > > Edu > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang > wrote: > > > Hello Colt, > > > > Thanks for writing the KIP! I have read through the updated KIP and > > overall it looks great. I only have minor naming comments (well, > > aren't naming the least boring stuff to discuss and that takes the > > most of the time for KIPs :P): > > > > 1. I tend to agree with Sophie regarding whether or not to include > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since > > it is also more consistent with the functions of > > "StateRestoreListener" where we do not name it as > > "onStateRestoreState" etc. > > > > 2. I know in community discussions we sometimes say "a standby is > > promoted to active", but in the official code / java docs we did not > > have a term of "promotion", since what the code does is really recycle > > the task (while keeping its state stores open), and create a new > > active task that takes in the recycled state stores and just changing > > the other fields like task type etc. After thinking about this for a > > bit, I tend to feel that "promoted" is indeed a better name for user > > facing purposes while "recycle" is more of a technical detail inside > > the code and could be abstracted away from users. So I feel keeping > > the name "PROMOTED" is fine. > > > > 3. Regarding "earliestOffset", it does feel like we cannot always > > avoid another call to the Kafka API. And on the other hand, I also > > tend to think that such bookkeeping may be better done at the app > > level than from the Streams' public API level. I.e. the app could keep > > a "first ever starting offset" per "topic-partition-store" key, and a > > when we have rolling restart and hence some standby task keeps > > "jumping" from one client to another via task assignment, the app > > would update this value just one when it finds the > > ""topic-partition-store" was never triggered before. What do you > > think? > > > > 4. I do not have a strong opinion either, but what about > "onBatchUpdated" ? > > > > > > Guozhang > > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy > wrote: > > > > > > Sohpie— > > > > > > Thank you very much for such a detailed review of the KIP. It might > > > actually be longer than the original KIP in the first place! > > > > > > 1. Ack'ed and fixed. > > > > > > 2. Correct, this is a confusing passage and requires context: > > > > > > One thing on our list of TODO's regarding reliability is to determine > how > > > to configure `session.timeout.ms`. In our Kubernetes Environment, an > > > instance of our Streams App can be terminated, restarted, and get back > > into > > > the "RUNNING" Streams state in about 20 seconds. We have two options > > here: > > > a) set session.timeout.ms to 30 seconds or so, and deal with 20 > seconds > > of > > > unavailability for affected partitions, but avoid shuffling Tasks; or > b) > > > set session.timeout.ms to a low value, such as 6 seconds ( > > > heartbeat.interval.ms of 2000), and reduce the unavailability window > > during > > > a rolling bounce but incur an "extra" rebalance. There are several > > > different costs to a rebalance, including the shuffling of standby > tasks. > > > JMX metrics are not fine-grained enough to give us an accurate picture > of > > > what's going on with the whole Standby Task Shuffle Dance. I > hypothesize > > > that the Standby Update Listener might help us clarify just how the > > > shuffling actually (not
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hello everyone, Thanks for all your feedback for this KIP! I think that the key to choosing proper names for this API is understanding the terms used inside the StoreChangelogReader. Currently, this class has two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion, using StandbyUpdateListener for the interface fits better on these terms. Same applies for onUpdateStart/Suspended. StoreChangelogReader uses "the same mechanism" for active task restoration and standby task updates, but this is an implementation detail. Under normal circumstances (no rebalances or task migrations), the changelog reader will be in STANDBY_UPDATING, which means it will be updating standby tasks as long as there are new records in the changelog topic. That's why I prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100% align with StateRestoreListener, but either one is fine. Edu On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang wrote: > Hello Colt, > > Thanks for writing the KIP! I have read through the updated KIP and > overall it looks great. I only have minor naming comments (well, > aren't naming the least boring stuff to discuss and that takes the > most of the time for KIPs :P): > > 1. I tend to agree with Sophie regarding whether or not to include > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since > it is also more consistent with the functions of > "StateRestoreListener" where we do not name it as > "onStateRestoreState" etc. > > 2. I know in community discussions we sometimes say "a standby is > promoted to active", but in the official code / java docs we did not > have a term of "promotion", since what the code does is really recycle > the task (while keeping its state stores open), and create a new > active task that takes in the recycled state stores and just changing > the other fields like task type etc. After thinking about this for a > bit, I tend to feel that "promoted" is indeed a better name for user > facing purposes while "recycle" is more of a technical detail inside > the code and could be abstracted away from users. So I feel keeping > the name "PROMOTED" is fine. > > 3. Regarding "earliestOffset", it does feel like we cannot always > avoid another call to the Kafka API. And on the other hand, I also > tend to think that such bookkeeping may be better done at the app > level than from the Streams' public API level. I.e. the app could keep > a "first ever starting offset" per "topic-partition-store" key, and a > when we have rolling restart and hence some standby task keeps > "jumping" from one client to another via task assignment, the app > would update this value just one when it finds the > ""topic-partition-store" was never triggered before. What do you > think? > > 4. I do not have a strong opinion either, but what about "onBatchUpdated" ? > > > Guozhang > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy wrote: > > > > Sohpie— > > > > Thank you very much for such a detailed review of the KIP. It might > > actually be longer than the original KIP in the first place! > > > > 1. Ack'ed and fixed. > > > > 2. Correct, this is a confusing passage and requires context: > > > > One thing on our list of TODO's regarding reliability is to determine how > > to configure `session.timeout.ms`. In our Kubernetes Environment, an > > instance of our Streams App can be terminated, restarted, and get back > into > > the "RUNNING" Streams state in about 20 seconds. We have two options > here: > > a) set session.timeout.ms to 30 seconds or so, and deal with 20 seconds > of > > unavailability for affected partitions, but avoid shuffling Tasks; or b) > > set session.timeout.ms to a low value, such as 6 seconds ( > > heartbeat.interval.ms of 2000), and reduce the unavailability window > during > > a rolling bounce but incur an "extra" rebalance. There are several > > different costs to a rebalance, including the shuffling of standby tasks. > > JMX metrics are not fine-grained enough to give us an accurate picture of > > what's going on with the whole Standby Task Shuffle Dance. I hypothesize > > that the Standby Update Listener might help us clarify just how the > > shuffling actually (not theoretically) works, which will help us make a > > more informed decision about the session timeout config. > > > > If you think this is worth putting in the KIP, I'll polish it and do so; > > else, I'll remove the current half-baked explanation. > > > > 3. Overall, I agree with this. In our app, each Task has only one Store > to > > reduce the number of changelog partitions, so I sometimes forget the > > distinction between the two concepts, as reflected in the KIP (: > > > > 3a. I don't like the word "Restore" here, since Restoration refers to an > > Active Task getting caught up in preparation to resume processing. > > `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a > > native Python speaker so I do prefer shorter names anyways (: > > > > 3b1. +1 to
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hello Colt, Thanks for writing the KIP! I have read through the updated KIP and overall it looks great. I only have minor naming comments (well, aren't naming the least boring stuff to discuss and that takes the most of the time for KIPs :P): 1. I tend to agree with Sophie regarding whether or not to include "Standby" in the functions of "onStandbyUpdateStart/Suspended", since it is also more consistent with the functions of "StateRestoreListener" where we do not name it as "onStateRestoreState" etc. 2. I know in community discussions we sometimes say "a standby is promoted to active", but in the official code / java docs we did not have a term of "promotion", since what the code does is really recycle the task (while keeping its state stores open), and create a new active task that takes in the recycled state stores and just changing the other fields like task type etc. After thinking about this for a bit, I tend to feel that "promoted" is indeed a better name for user facing purposes while "recycle" is more of a technical detail inside the code and could be abstracted away from users. So I feel keeping the name "PROMOTED" is fine. 3. Regarding "earliestOffset", it does feel like we cannot always avoid another call to the Kafka API. And on the other hand, I also tend to think that such bookkeeping may be better done at the app level than from the Streams' public API level. I.e. the app could keep a "first ever starting offset" per "topic-partition-store" key, and a when we have rolling restart and hence some standby task keeps "jumping" from one client to another via task assignment, the app would update this value just one when it finds the ""topic-partition-store" was never triggered before. What do you think? 4. I do not have a strong opinion either, but what about "onBatchUpdated" ? Guozhang On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy wrote: > > Sohpie— > > Thank you very much for such a detailed review of the KIP. It might > actually be longer than the original KIP in the first place! > > 1. Ack'ed and fixed. > > 2. Correct, this is a confusing passage and requires context: > > One thing on our list of TODO's regarding reliability is to determine how > to configure `session.timeout.ms`. In our Kubernetes Environment, an > instance of our Streams App can be terminated, restarted, and get back into > the "RUNNING" Streams state in about 20 seconds. We have two options here: > a) set session.timeout.ms to 30 seconds or so, and deal with 20 seconds of > unavailability for affected partitions, but avoid shuffling Tasks; or b) > set session.timeout.ms to a low value, such as 6 seconds ( > heartbeat.interval.ms of 2000), and reduce the unavailability window during > a rolling bounce but incur an "extra" rebalance. There are several > different costs to a rebalance, including the shuffling of standby tasks. > JMX metrics are not fine-grained enough to give us an accurate picture of > what's going on with the whole Standby Task Shuffle Dance. I hypothesize > that the Standby Update Listener might help us clarify just how the > shuffling actually (not theoretically) works, which will help us make a > more informed decision about the session timeout config. > > If you think this is worth putting in the KIP, I'll polish it and do so; > else, I'll remove the current half-baked explanation. > > 3. Overall, I agree with this. In our app, each Task has only one Store to > reduce the number of changelog partitions, so I sometimes forget the > distinction between the two concepts, as reflected in the KIP (: > > 3a. I don't like the word "Restore" here, since Restoration refers to an > Active Task getting caught up in preparation to resume processing. > `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a > native Python speaker so I do prefer shorter names anyways (: > > 3b1. +1 to removing the word 'Task'. > > 3b2. I like `onUpdateStart()`, but with your permission I'd prefer > `onStandbyUpdateStart()` which matches the name of the Interface > "StandbyUpdateListener". (the python part of me hates this, however) > > 3b3. Going back to question 2), `earliestOffset` was intended to allow us > to more easily calculate the amount of state _already loaded_ in the store > by subtracting (startingOffset - earliestOffset). This would help us see > how much inefficiency is introduced in a rolling restart—if we end up going > from a situation with an up-to-date standby before the restart, and then > after the whole restart, the Task is shuffled onto an instance where there > is no previous state, then that is expensive. However, if the final > shuffling results in the Task back on an instance with a lot of pre-built > state, it's not expensive. > > If a call over the network is required to determine the earliestOffset, > then this is a "hard no-go" for me, and we will remove it (I'll have to > check with Eduwer as he is close to having a working implementation). I > think we can probably determine
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Sohpie— Thank you very much for such a detailed review of the KIP. It might actually be longer than the original KIP in the first place! 1. Ack'ed and fixed. 2. Correct, this is a confusing passage and requires context: One thing on our list of TODO's regarding reliability is to determine how to configure `session.timeout.ms`. In our Kubernetes Environment, an instance of our Streams App can be terminated, restarted, and get back into the "RUNNING" Streams state in about 20 seconds. We have two options here: a) set session.timeout.ms to 30 seconds or so, and deal with 20 seconds of unavailability for affected partitions, but avoid shuffling Tasks; or b) set session.timeout.ms to a low value, such as 6 seconds ( heartbeat.interval.ms of 2000), and reduce the unavailability window during a rolling bounce but incur an "extra" rebalance. There are several different costs to a rebalance, including the shuffling of standby tasks. JMX metrics are not fine-grained enough to give us an accurate picture of what's going on with the whole Standby Task Shuffle Dance. I hypothesize that the Standby Update Listener might help us clarify just how the shuffling actually (not theoretically) works, which will help us make a more informed decision about the session timeout config. If you think this is worth putting in the KIP, I'll polish it and do so; else, I'll remove the current half-baked explanation. 3. Overall, I agree with this. In our app, each Task has only one Store to reduce the number of changelog partitions, so I sometimes forget the distinction between the two concepts, as reflected in the KIP (: 3a. I don't like the word "Restore" here, since Restoration refers to an Active Task getting caught up in preparation to resume processing. `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a native Python speaker so I do prefer shorter names anyways (: 3b1. +1 to removing the word 'Task'. 3b2. I like `onUpdateStart()`, but with your permission I'd prefer `onStandbyUpdateStart()` which matches the name of the Interface "StandbyUpdateListener". (the python part of me hates this, however) 3b3. Going back to question 2), `earliestOffset` was intended to allow us to more easily calculate the amount of state _already loaded_ in the store by subtracting (startingOffset - earliestOffset). This would help us see how much inefficiency is introduced in a rolling restart—if we end up going from a situation with an up-to-date standby before the restart, and then after the whole restart, the Task is shuffled onto an instance where there is no previous state, then that is expensive. However, if the final shuffling results in the Task back on an instance with a lot of pre-built state, it's not expensive. If a call over the network is required to determine the earliestOffset, then this is a "hard no-go" for me, and we will remove it (I'll have to check with Eduwer as he is close to having a working implementation). I think we can probably determine what we wanted to see in a different way, but it will take more thinking.. If `earliestOffset` is confusing, perhaps rename it to `earliestChangelogOffset`? `startingOffset` is easy to remove as it can be determined from the first call to `onBatch{Restored/Updated/Processed/Loaded}()`. Anyways, I've updated the JavaDoc in the interface; hopefully it's more clear. Awaiting further instructions here. 3c. Good point; after thinking, my preference is `onBatchLoaded()` -> `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`. I am less fond of "processed" because when I was first learning Streams I mistakenly thought that standby tasks actually processed the input topic rather than loaded from the changelog. I'll defer to you here. 3d. +1 to `onUpdateSuspended()`, or better yet `onStandbyUpdateSuspended()`. Will check about the implementation of keeping track of the number of records loaded. 4a. I think this might be best in a separate KIP, especially given that this is my and Eduwer's first time contributing to Kafka (so we want to minimize the blast radius). 4b. I might respectfully (and timidly) push back here, RECYCLED for an Active Task is a bit confusing to me. DEMOTED and MIGRATED make sense from the standpoint of an Active Task, recycling to me sounds like throwing stuff away, such that the resources (i.e. disk space) can be used by a separate Task. As an alternative rather than trying to reuse the same enum, maybe rename it to `StandbySuspendReason` to avoid naming conflicts with `ActiveSuspendReason`? However, I could be convinced to rename PROMOTED -> RECYCLED, especially if Eduwer agrees. TLDR: T1. Agreed, will remove the word "Task" as it's incorrect. T2. Will update to `onStandbyUpdateStart()` T3. Awaiting further instructions on earliestOffset and startingOffset. T4. I don't like `onBatchProcessed()` too much, perhaps `onBatchLoaded()`? T5. Will update to `onStandbyUpdateSuspended()` T6. Thoughts on renaming SuspendReason to
Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener
Hey Colt! Thanks for the KIP -- this will be a great addition to Streams, I can't believe we've gone so long without this. Overall the proposal makes sense, but I had a handful of fairly minor questions and suggestions/requests 1. Seems like the last sentence in the 2nd paragraph of the Motivation section is cut off and incomplete -- "want to be able to know " what exactly? 2. This isn't that important since the motivation as a whole is clear to me and convincing enough, but I'm not quite sure I understand the example at the end of the Motivation section. How are standby tasks (and the ability to hook into and monitor their status) related to the session.timeout.ms config? 3. To help both old and new users of Kafka Streams understand this new restore listener and its purpose/semantics, can we try to name the class and callbacks in a way that's more consistent with the active task restore listener? 3a. StandbyTaskUpdateListener: The existing restore listener is called StateRestoreListener, so the new one could be called something like StandbyStateRestoreListener. Although we typically refer to standby tasks as "processing" rather than "restoring" records -- ie restoration is a term for active task state specifically. I actually like the original suggestion if we just drop the "Task" part of the name, ie StandbyUpdateListener. I think either that or StandbyRestoreListener would be fine and probably the two best options. Also, this probably goes without saying but any change to the name of this class should of course be reflected in the KafkaStreams#setXXX API as well 3b. #onTaskCreated I know the "start" callback feels a bit different for the standby task updater vs an active task beginning restoration, but I think we should try to keep the various callbacks aligned to their active restore listener counterpart. We can/should just replace the term "restore" with "update" for the callback method names the same way we do for the class name, which in this case would give us #onUpdateStart. Personally I like this better, but it's ultimately up to you. However, I would push back against anything that includes the word "Task" (eg #onTaskCreated) as the listener is actually not scoped to the task itself but instead to the individual state store(s). This is the main reason I would prefer calling it something like #onUpdateStart, which keeps the focus on the store being updated rather than the task that just happens to own this store One last thing on this callback -- do we really need both the `earliestOffset` and `startingOffset`? I feel like this might be more confusing than it is helpful (tbh even I'm not completely sure I know what the earliestOffset is supposed to represent) More importantly, is this all information that is already available and able to be passed in to the callback by Streams? I haven't checked on this but it feels like the earliestOffset is likely to require a remote call, either by the embedded consumer or via the admin client. If so, the ROI on including this parameter seems quite low (if not outright negative) 3c. #onBatchRestored If we opt to use the term "update" in place of "restore" elsewhere, then we should consider doing so here as well. What do you think about #onBatchUpdated, or even #onBatchProcessed? I'm actually not super concerned about this particular API, and honestly I think we can use restore or update interchangeably here, so if you don't like any of the suggested names (and no one can think of anything better), I would just stick with #onBatchRestored. In this case, it kind of makes the most sense. 3d. #onTaskSuspended Along the same lines as 3b above, #onUpdateSuspended or just #onRestoreSuspended probably makes more sense for this callback. Also, I notice the StateRestoreListener passes in the total number of records restored to its #onRestoreSuspended. Assuming we already track that information in Streams and have it readily available to pass in at whatever point we would be invoking this callback, that might be a useful parameter for the standby listener to have as well 4. I totally love the SuspendReason thing, just two notes/requests: 4a. Feel free to push back against adding onto the scope of this KIP, but it would be great to expand the active state restore listener with this SuspendReason enum as well. It would be really useful for both variants of restore listener 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for standby tasks it means basically the same thing, the point is that active tasks can also be recycled into standbys through the same mechanism. This way they can share the SuspendReason enum -- not that it's necessary for them to share, I just think it would be a good idea to keep the two restore listeners aligned to the highest degree possible for as we can. I was actually considering proposing a short KIP with a new RecyclingListener (or something) specifically for this exact kind of thing, since we currently have