Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks. I buy the argument about the lag for active tasks. Nit: The KIP briefly mentions the deprecation of `metadataFoKey()` methods -- those should be listed as `@deprecated` next to the newly added methods to point this change out more visibly. Nit: in the code example, why do we loop over `inSyncStandbys` ? Would we not just query only the most up-to-date one? Nit: Section "Compatibility, Deprecation, and Migration Plan" should point out that two methods are deprecated and user can migrate their code to use the two new methods instead. Those nits only address the write-up of the KIP, not the actual design that LGTM. -Matthias On 11/14/19 3:48 PM, Guozhang Wang wrote: > 10/20: I think I'm aligned with John's replies as well. > > Guozhang > > On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar > wrote: > >>> during restoring state the active might have some lag >> >> Great catch, yes.. we cannot assume lag = 0 for active. Lets report active >> lag as well then. If active is too laggy, the app can then deem the store >> partition unavailable (based on what the application is willing to >> tolerate). >> >> @matthias do you agree? We can then begin the vote. >> >> On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar >> wrote: >> >>> I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all >>> actives don't have 0 lag, as during restoring state the active might have >>> some lag and one of the features of this KIP is to provide an option to >>> query from active (which might be in restoring state). >>> I will update the KIP with rejected alternatives and post this will start >>> a vote if everyone agrees on this. >>> On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler < >>> j...@confluent.io> wrote: >>> >>> Hi all, >>> >>> Thanks for the "reset", Vinoth. It brings some needed clarity to the >>> discussion. >>> >>> 10. My 2 cents: we might as well include the lags for the active >>> copies as well. This is a more context-free API. If we only include >>> standbys, this choice won't make sense to users unless they understand >>> that the active task cannot lag in the steady state, since it's the >>> source of updates. This isn't a bad thing to realize, but it's just >>> more mental overhead for the person who wants to list the lags for >>> "all local stores". >>> >>> Another reason is that we could consider also reporting the lag for >>> actives during recovery (when they would have non-zero lag). We don't >>> have to now, but if we choose to call the method "standby lags", then >>> we can't make this choice in the future. >>> >>> That said, it's just my opinion. I'm fine either way. >>> >>> 20. Vinoth's reply makes sense to me, fwiw. >>> >>> Beyond these two points, I'm happy with the current proposal. >>> >>> Thanks again, >>> -John >>> >>> On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar >>> wrote: 10. I considered that. Had to pick one or the other. Can just return standby too and rename method to may be “allLocalStandbyOffsetLags()” >> to have it explicit. (Standby should implicitly convey that we are talking about stores) 20. What I meant was, we are returning HostInfo instead of >>> StreamsMetadata since thats sufficient to route query; same for “int partition “ vs >> topic partition before. Previously KeyQueryMetadata had similar structure but used StreamsMetadata and TopicPartition objects to convey same >>> information @navinder KIP is already upto date with the email I sent, except for >> the reasonings I was laying out. +1 on revisiting rejected alternatives. Please make the follow up changes On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax >> wrote: > Thanks for the summary Vinoth! > > I buy the overall argument. Couple of clarification questions: > > > 10. Why do we need to include the active stores in > `allLocalStoreOffsetLags()`? Would it not be simpler to just return >> lag > for standbys? > > > 20: What does > >> Thin the KeyQueryMetadata object to just contain the minimum >>> information >> needed. > > exaclty mean? What is the "minimum information needed" ? > > > @Navinder: if you agree, can you update the KIP accoringly? With all >>> the > proposals, it's hard to keep track and it would be great to have the > current proposal summarized in the wiki page. > > Please also update the "Rejected alternative" sections to avoid that >> we > cycle back to old proposal (including the reason _why_ they got >>> rejected). > > > Thanks a lot! > > > -Matthias > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: >> Given we have had a healthy discussion on this topic for a month >> now >>> and >> still with many loose ends and open ended conversations, I thought >> It > would >> be worthwhile to take a step back and re-evaluate
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
10/20: I think I'm aligned with John's replies as well. Guozhang On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar wrote: > >during restoring state the active might have some lag > > Great catch, yes.. we cannot assume lag = 0 for active. Lets report active > lag as well then. If active is too laggy, the app can then deem the store > partition unavailable (based on what the application is willing to > tolerate). > > @matthias do you agree? We can then begin the vote. > > On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar > wrote: > > > I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all > > actives don't have 0 lag, as during restoring state the active might have > > some lag and one of the features of this KIP is to provide an option to > > query from active (which might be in restoring state). > > I will update the KIP with rejected alternatives and post this will start > > a vote if everyone agrees on this. > > On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler < > > j...@confluent.io> wrote: > > > > Hi all, > > > > Thanks for the "reset", Vinoth. It brings some needed clarity to the > > discussion. > > > > 10. My 2 cents: we might as well include the lags for the active > > copies as well. This is a more context-free API. If we only include > > standbys, this choice won't make sense to users unless they understand > > that the active task cannot lag in the steady state, since it's the > > source of updates. This isn't a bad thing to realize, but it's just > > more mental overhead for the person who wants to list the lags for > > "all local stores". > > > > Another reason is that we could consider also reporting the lag for > > actives during recovery (when they would have non-zero lag). We don't > > have to now, but if we choose to call the method "standby lags", then > > we can't make this choice in the future. > > > > That said, it's just my opinion. I'm fine either way. > > > > 20. Vinoth's reply makes sense to me, fwiw. > > > > Beyond these two points, I'm happy with the current proposal. > > > > Thanks again, > > -John > > > > On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar > > wrote: > > > > > > 10. I considered that. Had to pick one or the other. Can just return > > > standby too and rename method to may be “allLocalStandbyOffsetLags()” > to > > > have it explicit. (Standby should implicitly convey that we are talking > > > about stores) > > > > > > 20. What I meant was, we are returning HostInfo instead of > > StreamsMetadata > > > since thats sufficient to route query; same for “int partition “ vs > topic > > > partition before. Previously KeyQueryMetadata had similar structure but > > > used StreamsMetadata and TopicPartition objects to convey same > > information > > > > > > @navinder KIP is already upto date with the email I sent, except for > the > > > reasonings I was laying out. +1 on revisiting rejected alternatives. > > > Please make the follow up changes > > > > > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax > > > > wrote: > > > > > > > Thanks for the summary Vinoth! > > > > > > > > I buy the overall argument. Couple of clarification questions: > > > > > > > > > > > > 10. Why do we need to include the active stores in > > > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return > lag > > > > for standbys? > > > > > > > > > > > > 20: What does > > > > > > > > > Thin the KeyQueryMetadata object to just contain the minimum > > information > > > > > needed. > > > > > > > > exaclty mean? What is the "minimum information needed" ? > > > > > > > > > > > > @Navinder: if you agree, can you update the KIP accoringly? With all > > the > > > > proposals, it's hard to keep track and it would be great to have the > > > > current proposal summarized in the wiki page. > > > > > > > > Please also update the "Rejected alternative" sections to avoid that > we > > > > cycle back to old proposal (including the reason _why_ they got > > rejected). > > > > > > > > > > > > Thanks a lot! > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: > > > > > Given we have had a healthy discussion on this topic for a month > now > > and > > > > > still with many loose ends and open ended conversations, I thought > It > > > > would > > > > > be worthwhile to take a step back and re-evaluate everything in the > > > > context > > > > > of the very real use-case and its specific scenarios. > > > > > > > > > > First, let's remind ourselves of the query routing flow of the > > streams > > > > > application ("app" here on) > > > > > > > > > >1. queries get routed to any random streams instance in the > > cluster > > > > >("router" here on) > > > > >2. router then uses Streams metadata to pick active/standby > > instances > > > > >for that key's store/partition > > > > >3. router instance also maintains global lag information for all > > > > stores > > > > >and all their partitions, by a
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
>during restoring state the active might have some lag Great catch, yes.. we cannot assume lag = 0 for active. Lets report active lag as well then. If active is too laggy, the app can then deem the store partition unavailable (based on what the application is willing to tolerate). @matthias do you agree? We can then begin the vote. On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar wrote: > I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all > actives don't have 0 lag, as during restoring state the active might have > some lag and one of the features of this KIP is to provide an option to > query from active (which might be in restoring state). > I will update the KIP with rejected alternatives and post this will start > a vote if everyone agrees on this. > On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler < > j...@confluent.io> wrote: > > Hi all, > > Thanks for the "reset", Vinoth. It brings some needed clarity to the > discussion. > > 10. My 2 cents: we might as well include the lags for the active > copies as well. This is a more context-free API. If we only include > standbys, this choice won't make sense to users unless they understand > that the active task cannot lag in the steady state, since it's the > source of updates. This isn't a bad thing to realize, but it's just > more mental overhead for the person who wants to list the lags for > "all local stores". > > Another reason is that we could consider also reporting the lag for > actives during recovery (when they would have non-zero lag). We don't > have to now, but if we choose to call the method "standby lags", then > we can't make this choice in the future. > > That said, it's just my opinion. I'm fine either way. > > 20. Vinoth's reply makes sense to me, fwiw. > > Beyond these two points, I'm happy with the current proposal. > > Thanks again, > -John > > On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar > wrote: > > > > 10. I considered that. Had to pick one or the other. Can just return > > standby too and rename method to may be “allLocalStandbyOffsetLags()” to > > have it explicit. (Standby should implicitly convey that we are talking > > about stores) > > > > 20. What I meant was, we are returning HostInfo instead of > StreamsMetadata > > since thats sufficient to route query; same for “int partition “ vs topic > > partition before. Previously KeyQueryMetadata had similar structure but > > used StreamsMetadata and TopicPartition objects to convey same > information > > > > @navinder KIP is already upto date with the email I sent, except for the > > reasonings I was laying out. +1 on revisiting rejected alternatives. > > Please make the follow up changes > > > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax > > wrote: > > > > > Thanks for the summary Vinoth! > > > > > > I buy the overall argument. Couple of clarification questions: > > > > > > > > > 10. Why do we need to include the active stores in > > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag > > > for standbys? > > > > > > > > > 20: What does > > > > > > > Thin the KeyQueryMetadata object to just contain the minimum > information > > > > needed. > > > > > > exaclty mean? What is the "minimum information needed" ? > > > > > > > > > @Navinder: if you agree, can you update the KIP accoringly? With all > the > > > proposals, it's hard to keep track and it would be great to have the > > > current proposal summarized in the wiki page. > > > > > > Please also update the "Rejected alternative" sections to avoid that we > > > cycle back to old proposal (including the reason _why_ they got > rejected). > > > > > > > > > Thanks a lot! > > > > > > > > > -Matthias > > > > > > > > > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: > > > > Given we have had a healthy discussion on this topic for a month now > and > > > > still with many loose ends and open ended conversations, I thought It > > > would > > > > be worthwhile to take a step back and re-evaluate everything in the > > > context > > > > of the very real use-case and its specific scenarios. > > > > > > > > First, let's remind ourselves of the query routing flow of the > streams > > > > application ("app" here on) > > > > > > > >1. queries get routed to any random streams instance in the > cluster > > > >("router" here on) > > > >2. router then uses Streams metadata to pick active/standby > instances > > > >for that key's store/partition > > > >3. router instance also maintains global lag information for all > > > stores > > > >and all their partitions, by a gossip/broadcast/heartbeat protocol > > > (done > > > >outside of Streams framework), but using > KafkaStreams#allMetadata() > > > for > > > >streams instance discovery. > > > >4. router then uses information in 2 & 3 to determine which > instance > > > to > > > >send the query to : always picks active instance if alive or the > most > > > >in-sync live standby otherwise. >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all actives don't have 0 lag, as during restoring state the active might have some lag and one of the features of this KIP is to provide an option to query from active (which might be in restoring state). I will update the KIP with rejected alternatives and post this will start a vote if everyone agrees on this. On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler wrote: Hi all, Thanks for the "reset", Vinoth. It brings some needed clarity to the discussion. 10. My 2 cents: we might as well include the lags for the active copies as well. This is a more context-free API. If we only include standbys, this choice won't make sense to users unless they understand that the active task cannot lag in the steady state, since it's the source of updates. This isn't a bad thing to realize, but it's just more mental overhead for the person who wants to list the lags for "all local stores". Another reason is that we could consider also reporting the lag for actives during recovery (when they would have non-zero lag). We don't have to now, but if we choose to call the method "standby lags", then we can't make this choice in the future. That said, it's just my opinion. I'm fine either way. 20. Vinoth's reply makes sense to me, fwiw. Beyond these two points, I'm happy with the current proposal. Thanks again, -John On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar wrote: > > 10. I considered that. Had to pick one or the other. Can just return > standby too and rename method to may be “allLocalStandbyOffsetLags()” to > have it explicit. (Standby should implicitly convey that we are talking > about stores) > > 20. What I meant was, we are returning HostInfo instead of StreamsMetadata > since thats sufficient to route query; same for “int partition “ vs topic > partition before. Previously KeyQueryMetadata had similar structure but > used StreamsMetadata and TopicPartition objects to convey same information > > @navinder KIP is already upto date with the email I sent, except for the > reasonings I was laying out. +1 on revisiting rejected alternatives. > Please make the follow up changes > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax > wrote: > > > Thanks for the summary Vinoth! > > > > I buy the overall argument. Couple of clarification questions: > > > > > > 10. Why do we need to include the active stores in > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag > > for standbys? > > > > > > 20: What does > > > > > Thin the KeyQueryMetadata object to just contain the minimum information > > > needed. > > > > exaclty mean? What is the "minimum information needed" ? > > > > > > @Navinder: if you agree, can you update the KIP accoringly? With all the > > proposals, it's hard to keep track and it would be great to have the > > current proposal summarized in the wiki page. > > > > Please also update the "Rejected alternative" sections to avoid that we > > cycle back to old proposal (including the reason _why_ they got rejected). > > > > > > Thanks a lot! > > > > > > -Matthias > > > > > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: > > > Given we have had a healthy discussion on this topic for a month now and > > > still with many loose ends and open ended conversations, I thought It > > would > > > be worthwhile to take a step back and re-evaluate everything in the > > context > > > of the very real use-case and its specific scenarios. > > > > > > First, let's remind ourselves of the query routing flow of the streams > > > application ("app" here on) > > > > > > 1. queries get routed to any random streams instance in the cluster > > > ("router" here on) > > > 2. router then uses Streams metadata to pick active/standby instances > > > for that key's store/partition > > > 3. router instance also maintains global lag information for all > > stores > > > and all their partitions, by a gossip/broadcast/heartbeat protocol > > (done > > > outside of Streams framework), but using KafkaStreams#allMetadata() > > for > > > streams instance discovery. > > > 4. router then uses information in 2 & 3 to determine which instance > > to > > > send the query to : always picks active instance if alive or the most > > > in-sync live standby otherwise. > > > > > > Few things to note : > > > > > > A) We choose to decouple how the lag information is obtained (control > > > plane) from query path (data plane), since that provides more flexibility > > > in designing the control plane. i.e pick any or combination of gossip, > > > N-way broadcast, control the rate of propagation, piggybacking on request > > > responses > > > B) Since the app needs to do its own control plane, talking to other > > > instances directly for failure detection & exchanging other metadata, we > > > can leave the lag APIs added to KafkaStreams class itself local and > > simply > > > return lag for all store/partitions
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hi all, Thanks for the "reset", Vinoth. It brings some needed clarity to the discussion. 10. My 2 cents: we might as well include the lags for the active copies as well. This is a more context-free API. If we only include standbys, this choice won't make sense to users unless they understand that the active task cannot lag in the steady state, since it's the source of updates. This isn't a bad thing to realize, but it's just more mental overhead for the person who wants to list the lags for "all local stores". Another reason is that we could consider also reporting the lag for actives during recovery (when they would have non-zero lag). We don't have to now, but if we choose to call the method "standby lags", then we can't make this choice in the future. That said, it's just my opinion. I'm fine either way. 20. Vinoth's reply makes sense to me, fwiw. Beyond these two points, I'm happy with the current proposal. Thanks again, -John On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar wrote: > > 10. I considered that. Had to pick one or the other. Can just return > standby too and rename method to may be “allLocalStandbyOffsetLags()” to > have it explicit. (Standby should implicitly convey that we are talking > about stores) > > 20. What I meant was, we are returning HostInfo instead of StreamsMetadata > since thats sufficient to route query; same for “int partition “ vs topic > partition before. Previously KeyQueryMetadata had similar structure but > used StreamsMetadata and TopicPartition objects to convey same information > > @navinder KIP is already upto date with the email I sent, except for the > reasonings I was laying out. +1 on revisiting rejected alternatives. > Please make the follow up changes > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax > wrote: > > > Thanks for the summary Vinoth! > > > > I buy the overall argument. Couple of clarification questions: > > > > > > 10. Why do we need to include the active stores in > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag > > for standbys? > > > > > > 20: What does > > > > > Thin the KeyQueryMetadata object to just contain the minimum information > > > needed. > > > > exaclty mean? What is the "minimum information needed" ? > > > > > > @Navinder: if you agree, can you update the KIP accoringly? With all the > > proposals, it's hard to keep track and it would be great to have the > > current proposal summarized in the wiki page. > > > > Please also update the "Rejected alternative" sections to avoid that we > > cycle back to old proposal (including the reason _why_ they got rejected). > > > > > > Thanks a lot! > > > > > > -Matthias > > > > > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: > > > Given we have had a healthy discussion on this topic for a month now and > > > still with many loose ends and open ended conversations, I thought It > > would > > > be worthwhile to take a step back and re-evaluate everything in the > > context > > > of the very real use-case and its specific scenarios. > > > > > > First, let's remind ourselves of the query routing flow of the streams > > > application ("app" here on) > > > > > >1. queries get routed to any random streams instance in the cluster > > >("router" here on) > > >2. router then uses Streams metadata to pick active/standby instances > > >for that key's store/partition > > >3. router instance also maintains global lag information for all > > stores > > >and all their partitions, by a gossip/broadcast/heartbeat protocol > > (done > > >outside of Streams framework), but using KafkaStreams#allMetadata() > > for > > >streams instance discovery. > > >4. router then uses information in 2 & 3 to determine which instance > > to > > >send the query to : always picks active instance if alive or the most > > >in-sync live standby otherwise. > > > > > > Few things to note : > > > > > > A) We choose to decouple how the lag information is obtained (control > > > plane) from query path (data plane), since that provides more flexibility > > > in designing the control plane. i.e pick any or combination of gossip, > > > N-way broadcast, control the rate of propagation, piggybacking on request > > > responses > > > B) Since the app needs to do its own control plane, talking to other > > > instances directly for failure detection & exchanging other metadata, we > > > can leave the lag APIs added to KafkaStreams class itself local and > > simply > > > return lag for all store/partitions on that instance. > > > C) Streams preserves its existing behavior of instances only talking to > > > each other through the Kafka brokers. > > > D) Since the router treats active/standby differently, it would be good > > for > > > the KafkaStreams APIs to hand them back explicitly, with no additional > > > logic needed for computing them. Specifically, the router only knows two > > > things - key and store and if we just return a > > Collection > > > back, it
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
10. I considered that. Had to pick one or the other. Can just return standby too and rename method to may be “allLocalStandbyOffsetLags()” to have it explicit. (Standby should implicitly convey that we are talking about stores) 20. What I meant was, we are returning HostInfo instead of StreamsMetadata since thats sufficient to route query; same for “int partition “ vs topic partition before. Previously KeyQueryMetadata had similar structure but used StreamsMetadata and TopicPartition objects to convey same information @navinder KIP is already upto date with the email I sent, except for the reasonings I was laying out. +1 on revisiting rejected alternatives. Please make the follow up changes On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax wrote: > Thanks for the summary Vinoth! > > I buy the overall argument. Couple of clarification questions: > > > 10. Why do we need to include the active stores in > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag > for standbys? > > > 20: What does > > > Thin the KeyQueryMetadata object to just contain the minimum information > > needed. > > exaclty mean? What is the "minimum information needed" ? > > > @Navinder: if you agree, can you update the KIP accoringly? With all the > proposals, it's hard to keep track and it would be great to have the > current proposal summarized in the wiki page. > > Please also update the "Rejected alternative" sections to avoid that we > cycle back to old proposal (including the reason _why_ they got rejected). > > > Thanks a lot! > > > -Matthias > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: > > Given we have had a healthy discussion on this topic for a month now and > > still with many loose ends and open ended conversations, I thought It > would > > be worthwhile to take a step back and re-evaluate everything in the > context > > of the very real use-case and its specific scenarios. > > > > First, let's remind ourselves of the query routing flow of the streams > > application ("app" here on) > > > >1. queries get routed to any random streams instance in the cluster > >("router" here on) > >2. router then uses Streams metadata to pick active/standby instances > >for that key's store/partition > >3. router instance also maintains global lag information for all > stores > >and all their partitions, by a gossip/broadcast/heartbeat protocol > (done > >outside of Streams framework), but using KafkaStreams#allMetadata() > for > >streams instance discovery. > >4. router then uses information in 2 & 3 to determine which instance > to > >send the query to : always picks active instance if alive or the most > >in-sync live standby otherwise. > > > > Few things to note : > > > > A) We choose to decouple how the lag information is obtained (control > > plane) from query path (data plane), since that provides more flexibility > > in designing the control plane. i.e pick any or combination of gossip, > > N-way broadcast, control the rate of propagation, piggybacking on request > > responses > > B) Since the app needs to do its own control plane, talking to other > > instances directly for failure detection & exchanging other metadata, we > > can leave the lag APIs added to KafkaStreams class itself local and > simply > > return lag for all store/partitions on that instance. > > C) Streams preserves its existing behavior of instances only talking to > > each other through the Kafka brokers. > > D) Since the router treats active/standby differently, it would be good > for > > the KafkaStreams APIs to hand them back explicitly, with no additional > > logic needed for computing them. Specifically, the router only knows two > > things - key and store and if we just return a > Collection > > back, it cannot easily tease apart active and standby. Say, a streams > > instance hosts the same store as both active and standby for different > > partitions, matching by just storename the app will find it in both > active > > and standby lists. > > E) From above, we assume the global lag estimate (lag per store topic > > partition) are continuously reported amongst application instances and > > already available on the router during step 2 above. Hence, attaching lag > > APIs to StreamsMetadata is unnecessary and does not solve the needs > anyway. > > F) Currently returned StreamsMetadata object is really information about > a > > streams instance, that is not very specific to the key being queried. > > Specifically, router has no knowledge of the topic partition a given key > > belongs, this is needed for matching to the global lag information in > step > > 4 above (and as also the example code in the KIP showed before). The > > StreamsMetadata, since it's about the instance itself, would contain all > > topic partitions and stores on that instance, not specific to the given > key. > > G) A cleaner API would thin the amount of information returned to > > specifically the given key and
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
In all, is everyone OK with - Dropping KeyQueryMetadata, and the allMetadataForKey() apis - Dropping allLagInfo() from KafkaStreams class, Drop StoreLagInfo class - Add offsetLag(store, key, serializer) -> Optional & offsetLag(store, key, partitioner) -> Optional to StreamsMetadata - Duplicate the current methods for standbyMetadata in KafkaStreams : allStandbyMetadata(), allStandbyMetadataForStore(), two variants of standbyMetadataForKey(), Responses to Guozhang: 1.1 Like I mentioned before, the allStandbyMetadata() and allStandbyMetadataForStore() complement existing allMetadata() and allMetadataForStore(), since we don't want to change behavior of existing APIs. Based on discussions so far, if we decide to drop KeyQueryMetadata, then we will need to introduce 4 equivalents for standby metadata as Matthias mentioned. 1.2 I am okay with pushing lag information to a method on StreamsMetadata (Personally, I won't design it like that, but happy to live with it) like what Matthias suggested. But assuming topic name <=> store name equivalency for mapping this seems like a broken API to me. If all of Streams code were written like this, I can understand. But I don't think its the case? I would not be comfortable making such assumptions outside of public APIs. >>look into each one's standby partition / stores to tell which one StreamsMetadata is corresponding to the instance who holds a specific key as standby, yes, but I feel this one extra iteration is worth to avoid introducing a new class. This sort of thing would lead to non-standardized/potentially buggy client implementations, for something I expect the system would hand me directly. I don't personally feel introducing a new class is so bad, to warrant the user to do all this matching. Given the current APIs are not explicitly named to denote active metadata, it gives us a chance to build something more direct and clear IMO. If we do allMetadataForKey apis, then we should clearly separate active and standby ourselves. Alternate is separate active and standby APIs as Matthias suggests, which I can make peace with. 1.3 Similar as above. In Streams code, we treat topic partitions and store names separately?. 2.1 I think most databases build replication using logical offsets, not time. Time lag can be a nice to have feature, but offset lag is fully sufficient for a lot of use-cases. 2.2.1 We could support a lagInfoForStores() batch api. makes sense. Responses to Matthias : (100) Streams can still keep the upto date version in memory and implementation could be for now just reading this already refreshed value. Designing the API, with intent of pushing this to the user keeps doors open for supporting time based lag in the future. (101) I am not sure what the parameters of evaluating approaches here is. Generally, when I am handed a Metadata object, I don't expect to further query it for more information semantically. I would not also force user to make separate calls for active and standby metadata. Well, that may be just me. So sure, we can push this into StreamsMetadata if everyone agrees! +1 on duplicating all 4 methods for standbys in this case. On Tue, Nov 12, 2019 at 4:12 AM Navinder Brar wrote: > >- Looking back, I agree that 2 calls to StreamsMetadata to fetch > StreamsMetadata and then using something like ‘long > StreamsMetadata#offsetLag(store, key)’ which Matthias suggested is better > than introducing a new public API i.e. ‘KeyQueryMetadata’. I will change > the KIP accordingly. >- >> I am actually not even sure, why we added > `StreamsMetadata#topicPartitions()` originally > I think it is helpful in showing which host holds which source topic > partitions for /instances endpoint and when you query a key, you need to > match the partition on which the key belongs to with the hosts holding > source topic partitions for that partition. Is there any other way to get > this info? > > On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang < > wangg...@gmail.com> wrote: > > Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams instances, so > 1) allMetadata would still return the same number of StreamsMetadata in > collection, just that within the StreamsMetadata now you have new APIs to > access standby partitions / stores. So I think it would not be a breaking > change to the public API to not include `allStandbyMetadata` and ` > allStandbyMetadataForStore` but still rely on `allMetadata(ForStore)`? > > Regarding 1.1: Good point about the partition number. But I'm still > wondering if it is definitely necessary to introduce a new > `KeyQueryMetadata` > interface class. E.g. suppose our function signature is > > Collection allMetadataForKey > > When you get the collection of StreamsMetadata you need to iterate over the > collection and look into each one's standby partition / stores to tell > which one StreamsMetadata is corresponding to the instance who holds a > specific key as standby, yes, but I feel
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
- Looking back, I agree that 2 calls to StreamsMetadata to fetch StreamsMetadata and then using something like ‘long StreamsMetadata#offsetLag(store, key)’ which Matthias suggested is better than introducing a new public API i.e. ‘KeyQueryMetadata’. I will change the KIP accordingly. - >> I am actually not even sure, why we added `StreamsMetadata#topicPartitions()` originally I think it is helpful in showing which host holds which source topic partitions for /instances endpoint and when you query a key, you need to match the partition on which the key belongs to with the hosts holding source topic partitions for that partition. Is there any other way to get this info? On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang wrote: Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams instances, so 1) allMetadata would still return the same number of StreamsMetadata in collection, just that within the StreamsMetadata now you have new APIs to access standby partitions / stores. So I think it would not be a breaking change to the public API to not include `allStandbyMetadata` and ` allStandbyMetadataForStore` but still rely on `allMetadata(ForStore)`? Regarding 1.1: Good point about the partition number. But I'm still wondering if it is definitely necessary to introduce a new `KeyQueryMetadata` interface class. E.g. suppose our function signature is Collection allMetadataForKey When you get the collection of StreamsMetadata you need to iterate over the collection and look into each one's standby partition / stores to tell which one StreamsMetadata is corresponding to the instance who holds a specific key as standby, yes, but I feel this one extra iteration is worth to avoid introducing a new class. Guozhang On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax wrote: > I agree, that we might want to drop the time-base lag for the initial > implementation. There is no good way to get this information without a > broker side change. > > > > (100) For the offset lag information, I don't see a reason why the app > should drive when this information is updated though, because KS will > update this information anyway (once per `commit.interval.ms` -- and > updating it more frequently does not make sense, as it most likely won't > change more frequently anyway). > > If you all insist that the app should drive it, I can live with it, but > I think it makes the API unnecessarily complex without a benefit. > > > > (101) I still don't understand why we need to have `KeyQueryMetadata` > though. Note, that an instance can only report lag for it's local > stores, but not remote stores as it does not know to what offset a > remote standby has caught up to. > > > Because we needed to return the topicPartition the key belongs to, in > > order to correlate with the lag information from the other set of APIs. > > My suggestion is to get the lag information from `StreamsMetadata` -- > which partition the store belongs to can be completely encapsulated > within KS as all information is local, and I don't think we need to > expose it to the user at all. > > We can just add `long StreamsMetadata#offsetLag(store, key)`. If the > store is local we return its lag, if it's remote we return `-1` (ie, > UNKNOWN). As an alternative, we can change the return type to > `Optional`. This works for active and standby task alike. > > Note, that a user must verify if `StreamsMeatadata` is for itself > (local) or remote anyway. We only need to provide a way that allows > users to distinguish between active an standby. (More below.) > > > I am actually not even sure, why we added > `StreamsMetadata#topicPartitions()` originally -- seems pretty useless. > Can we deprecate it as side cleanup in this KIP? Or do I miss something? > > > > (102) > > > There are basically 2 reasons. One is that instead of having two > functions, one to get StreamsMetadata for active and one for replicas. We > are fetching both in a single call and we have a way to get only active or > only replicas from the KeyQueryMetadata object(just like isStandby() and > isActive() discussion we had earlier) > > I understand, that we need two methods. However, I think we can simplify > the API and not introduce `KeyQueryMetadata`, but just "duplicate" all 4 > existing methods for standby tasks: > > // note that `standbyMetadataForKey` return a Collection in contrast to > existing `metadataForKey` > > > Collection allStandbyMetadata() > > Collection allStandbyMetadataForStore(String > storeName) > > Collection metadataForKey(String storeName, K key, > Serializer keySerializer) > > Collection metadataForKey(String storeName, K key, > StreamPartitioner partitioner) > > Because the existing methods return all active metadata, there is no > reason to return `KeyQueryMetadata` as it's more complicated to get the > standby metadata. With `KeyQueryMetadata` the user needs to make more > calls to get the metadata: > >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams instances, so 1) allMetadata would still return the same number of StreamsMetadata in collection, just that within the StreamsMetadata now you have new APIs to access standby partitions / stores. So I think it would not be a breaking change to the public API to not include `allStandbyMetadata` and ` allStandbyMetadataForStore` but still rely on `allMetadata(ForStore)`? Regarding 1.1: Good point about the partition number. But I'm still wondering if it is definitely necessary to introduce a new `KeyQueryMetadata` interface class. E.g. suppose our function signature is Collection allMetadataForKey When you get the collection of StreamsMetadata you need to iterate over the collection and look into each one's standby partition / stores to tell which one StreamsMetadata is corresponding to the instance who holds a specific key as standby, yes, but I feel this one extra iteration is worth to avoid introducing a new class. Guozhang On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax wrote: > I agree, that we might want to drop the time-base lag for the initial > implementation. There is no good way to get this information without a > broker side change. > > > > (100) For the offset lag information, I don't see a reason why the app > should drive when this information is updated though, because KS will > update this information anyway (once per `commit.interval.ms` -- and > updating it more frequently does not make sense, as it most likely won't > change more frequently anyway). > > If you all insist that the app should drive it, I can live with it, but > I think it makes the API unnecessarily complex without a benefit. > > > > (101) I still don't understand why we need to have `KeyQueryMetadata` > though. Note, that an instance can only report lag for it's local > stores, but not remote stores as it does not know to what offset a > remote standby has caught up to. > > > Because we needed to return the topicPartition the key belongs to, in > > order to correlate with the lag information from the other set of APIs. > > My suggestion is to get the lag information from `StreamsMetadata` -- > which partition the store belongs to can be completely encapsulated > within KS as all information is local, and I don't think we need to > expose it to the user at all. > > We can just add `long StreamsMetadata#offsetLag(store, key)`. If the > store is local we return its lag, if it's remote we return `-1` (ie, > UNKNOWN). As an alternative, we can change the return type to > `Optional`. This works for active and standby task alike. > > Note, that a user must verify if `StreamsMeatadata` is for itself > (local) or remote anyway. We only need to provide a way that allows > users to distinguish between active an standby. (More below.) > > > I am actually not even sure, why we added > `StreamsMetadata#topicPartitions()` originally -- seems pretty useless. > Can we deprecate it as side cleanup in this KIP? Or do I miss something? > > > > (102) > > > There are basically 2 reasons. One is that instead of having two > functions, one to get StreamsMetadata for active and one for replicas. We > are fetching both in a single call and we have a way to get only active or > only replicas from the KeyQueryMetadata object(just like isStandby() and > isActive() discussion we had earlier) > > I understand, that we need two methods. However, I think we can simplify > the API and not introduce `KeyQueryMetadata`, but just "duplicate" all 4 > existing methods for standby tasks: > > // note that `standbyMetadataForKey` return a Collection in contrast to > existing `metadataForKey` > > > Collection allStandbyMetadata() > > Collection allStandbyMetadataForStore(String > storeName) > > Collection metadataForKey(String storeName, K key, > Serializer keySerializer) > > Collection metadataForKey(String storeName, K key, > StreamPartitioner partitioner) > > Because the existing methods return all active metadata, there is no > reason to return `KeyQueryMetadata` as it's more complicated to get the > standby metadata. With `KeyQueryMetadata` the user needs to make more > calls to get the metadata: > > KafkaStreams#allMetadataForKey() > #getActive() > > KafkaStreams#allMetadataForKey() > #getStandby() > > vs: > > KafkaStreams#metadataForKey() > > KafkaStreams#standbyMetadataForKey() > > The wrapping of both within `KeyQueryMetadata` does not seem to provide > any benefit but increase our public API surface. > > > > @Guozhang: > > (1.1. + 1.2.) From my understanding `allMetadata()` (and other existing > methods) will only return the metadata of _active_ tasks for backward > compatibility reasons. If we would return standby metadata, existing > code would potentially "break" because the code might pick a standby to > query a key without noticing. > > > > -Matthias > > > On 11/8/19 6:07 AM, Navinder Brar wrote: > > Thanks, Guozhang for going through it
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
I agree, that we might want to drop the time-base lag for the initial implementation. There is no good way to get this information without a broker side change. (100) For the offset lag information, I don't see a reason why the app should drive when this information is updated though, because KS will update this information anyway (once per `commit.interval.ms` -- and updating it more frequently does not make sense, as it most likely won't change more frequently anyway). If you all insist that the app should drive it, I can live with it, but I think it makes the API unnecessarily complex without a benefit. (101) I still don't understand why we need to have `KeyQueryMetadata` though. Note, that an instance can only report lag for it's local stores, but not remote stores as it does not know to what offset a remote standby has caught up to. > Because we needed to return the topicPartition the key belongs to, in > order to correlate with the lag information from the other set of APIs. My suggestion is to get the lag information from `StreamsMetadata` -- which partition the store belongs to can be completely encapsulated within KS as all information is local, and I don't think we need to expose it to the user at all. We can just add `long StreamsMetadata#offsetLag(store, key)`. If the store is local we return its lag, if it's remote we return `-1` (ie, UNKNOWN). As an alternative, we can change the return type to `Optional`. This works for active and standby task alike. Note, that a user must verify if `StreamsMeatadata` is for itself (local) or remote anyway. We only need to provide a way that allows users to distinguish between active an standby. (More below.) I am actually not even sure, why we added `StreamsMetadata#topicPartitions()` originally -- seems pretty useless. Can we deprecate it as side cleanup in this KIP? Or do I miss something? (102) > There are basically 2 reasons. One is that instead of having two functions, > one to get StreamsMetadata for active and one for replicas. We are fetching > both in a single call and we have a way to get only active or only replicas > from the KeyQueryMetadata object(just like isStandby() and isActive() > discussion we had earlier) I understand, that we need two methods. However, I think we can simplify the API and not introduce `KeyQueryMetadata`, but just "duplicate" all 4 existing methods for standby tasks: // note that `standbyMetadataForKey` return a Collection in contrast to existing `metadataForKey` > Collection allStandbyMetadata() > Collection allStandbyMetadataForStore(String storeName) > Collection metadataForKey(String storeName, K key, > Serializer keySerializer) > Collection metadataForKey(String storeName, K key, > StreamPartitioner partitioner) Because the existing methods return all active metadata, there is no reason to return `KeyQueryMetadata` as it's more complicated to get the standby metadata. With `KeyQueryMetadata` the user needs to make more calls to get the metadata: KafkaStreams#allMetadataForKey() #getActive() KafkaStreams#allMetadataForKey() #getStandby() vs: KafkaStreams#metadataForKey() KafkaStreams#standbyMetadataForKey() The wrapping of both within `KeyQueryMetadata` does not seem to provide any benefit but increase our public API surface. @Guozhang: (1.1. + 1.2.) From my understanding `allMetadata()` (and other existing methods) will only return the metadata of _active_ tasks for backward compatibility reasons. If we would return standby metadata, existing code would potentially "break" because the code might pick a standby to query a key without noticing. -Matthias On 11/8/19 6:07 AM, Navinder Brar wrote: > Thanks, Guozhang for going through it again. > >- 1.1 & 1.2: The main point of adding topicPartition in KeyQueryMetadata > is not topicName, but the partition number. I agree changelog topicNames and > store names will have 1-1 mapping but we also need the partition number of > the changelog for which are calculating the lag. Now we can add partition > number in StreamsMetadata but it will be orthogonal to the definition of > StreamsMetadata i.e.- “Represents the state of an instance (process) in a > {@link KafkaStreams} application.” If we add partition number in this, it > doesn’t stay metadata for an instance, because now it is storing the > partition information for a key being queried. So, having “KeyQueryMetadata” > simplifies this as now it contains all the metadata and also changelog and > partition information for which we need to calculate the lag. > > Another way is having another function in parallel to metadataForKey, which > returns the partition number for the key being queried. But then we would > need 2 calls to StreamsMetadataState, once to fetch metadata and another to > fetch partition number. Let me know if any of these two ways seem more > intuitive than KeyQueryMetadata then we can
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks, Guozhang for going through it again. - 1.1 & 1.2: The main point of adding topicPartition in KeyQueryMetadata is not topicName, but the partition number. I agree changelog topicNames and store names will have 1-1 mapping but we also need the partition number of the changelog for which are calculating the lag. Now we can add partition number in StreamsMetadata but it will be orthogonal to the definition of StreamsMetadata i.e.- “Represents the state of an instance (process) in a {@link KafkaStreams} application.” If we add partition number in this, it doesn’t stay metadata for an instance, because now it is storing the partition information for a key being queried. So, having “KeyQueryMetadata” simplifies this as now it contains all the metadata and also changelog and partition information for which we need to calculate the lag. Another way is having another function in parallel to metadataForKey, which returns the partition number for the key being queried. But then we would need 2 calls to StreamsMetadataState, once to fetch metadata and another to fetch partition number. Let me know if any of these two ways seem more intuitive than KeyQueryMetadata then we can try to converge on one. - 1.3: Again, it is required for the partition number. We can drop store name though. - 2.1: I think this was done in accordance with the opinion from John as time lag would be better implemented with a broker level change and offset change is readily implementable. @vinoth? - 2.2.1: Good point. +1 - 2.2.2: I am not well aware of it, @vinoth any comments? - 3.1: I think we have already agreed on dropping this, we need to KIP. Also, is there any opinion on lagInfoForStore(String storeName) vs lagInfoForStore(String storeName, int partition) - 3.2: But in functions such as onAssignment(), onPartitionsAssigned(), for standbyTasks also the topicPartitions we use are input topic partitions and not changelog partitions. Would this be breaking from that semantics? On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang Wang wrote: Hi Navinder, Vinoth, thanks for the updated KIP! Read through the discussions so far and made another pass on the wiki page, and here are some more comments: 1. About the public APIs: 1.1. It is not clear to me how allStandbyMetadataForStore and allStandbyMetadata would be differentiated from the original APIs given that we will augment StreamsMetadata to include both active and standby topic-partitions and store names, so I think we can still use allMetadata and allMetadataForStore to get the collection of instance metadata that host the store both as active and standbys. Are there any specific use cases where we ONLY want to get the standby's metadata? And even if there are, we can easily filter it out from the allMetadata / allMetadataForStore right? 1.2. Similarly I'm wondering for allMetadataForKey, can we return the same type: "Collection" which includes 1 for active, and N-1 for standbys, and callers can easily identify them by looking inside the StreamsMetadata objects? In addition I feel the "topicPartition" field inside "KeyQueryMetadata" is not very important since the changelog topic-name is always 1-1 mapping to the store name, so as long as the store name matches, the changelog topic name should always match (i.e. in the pseudo code, just checking store names should be sufficient). If all of the above assumption is true, I think we can save us from introducing one more public class here. 1.3. Similarly in StoreLagInfo, seems not necessary to include the topic partition name in addition to the store name. 2. About querying store lags: we've discussed about separating the querying of the lag information and the querying of the host information so I still support having separate APIs here. More thoughts: 2.1. Compared with offsets, I'm wondering would time-difference be more intuitive for users to define the acceptable "staleness"? More strictly, are there any scenarios where we would actually prefer offsets over timestamps except that the timestamps are not available? 2.2. I'm also a bit leaning towards not putting the burden of periodically refreshing our lag and caching it (and introducing another config) on the streams side but document clearly its cost and let users to consider its call frequency; of course in terms of implementation there are some optimizations we can consider: 1) for restoring active tasks, the log-end-offset is read once since it is not expected to change, and that offset / timestamp can be remembered for lag calculation and we do not need to refresh again; 2) for standby tasks, there's a "Map endOffsets(Collection partitions)" in KafkaConsumer to batch a list of topic-partitions in one round-trip, and we can use that to let our APIs be sth. like "lagInfoForStores(Collection storeNames)" to enable the batching effects. 3. Misc.: 3.1 There's a typo on the pseudo code
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hi Navinder, Vinoth, thanks for the updated KIP! Read through the discussions so far and made another pass on the wiki page, and here are some more comments: 1. About the public APIs: 1.1. It is not clear to me how allStandbyMetadataForStore and allStandbyMetadata would be differentiated from the original APIs given that we will augment StreamsMetadata to include both active and standby topic-partitions and store names, so I think we can still use allMetadata and allMetadataForStore to get the collection of instance metadata that host the store both as active and standbys. Are there any specific use cases where we ONLY want to get the standby's metadata? And even if there are, we can easily filter it out from the allMetadata / allMetadataForStore right? 1.2. Similarly I'm wondering for allMetadataForKey, can we return the same type: "Collection" which includes 1 for active, and N-1 for standbys, and callers can easily identify them by looking inside the StreamsMetadata objects? In addition I feel the "topicPartition" field inside "KeyQueryMetadata" is not very important since the changelog topic-name is always 1-1 mapping to the store name, so as long as the store name matches, the changelog topic name should always match (i.e. in the pseudo code, just checking store names should be sufficient). If all of the above assumption is true, I think we can save us from introducing one more public class here. 1.3. Similarly in StoreLagInfo, seems not necessary to include the topic partition name in addition to the store name. 2. About querying store lags: we've discussed about separating the querying of the lag information and the querying of the host information so I still support having separate APIs here. More thoughts: 2.1. Compared with offsets, I'm wondering would time-difference be more intuitive for users to define the acceptable "staleness"? More strictly, are there any scenarios where we would actually prefer offsets over timestamps except that the timestamps are not available? 2.2. I'm also a bit leaning towards not putting the burden of periodically refreshing our lag and caching it (and introducing another config) on the streams side but document clearly its cost and let users to consider its call frequency; of course in terms of implementation there are some optimizations we can consider: 1) for restoring active tasks, the log-end-offset is read once since it is not expected to change, and that offset / timestamp can be remembered for lag calculation and we do not need to refresh again; 2) for standby tasks, there's a "Map endOffsets(Collection partitions)" in KafkaConsumer to batch a list of topic-partitions in one round-trip, and we can use that to let our APIs be sth. like "lagInfoForStores(Collection storeNames)" to enable the batching effects. 3. Misc.: 3.1 There's a typo on the pseudo code "globalLagInforation". Also it seems not describing how that information is collected (personally I also feel one "lagInfoForStores" is sufficient). 3.2 Note there's a slight semantical difference between active and standby's "partitions" inside StreamsMetadata, for active tasks the partitions are actually input topic partitions for the task: some of them may also act as changelog topics but these are exceptional cases; for standby tasks the "standbyTopicPartitions" are actually the changelog topics of the task. So maybe renaming it to "standbyChangelogPartitions" to differentiate it? Overall I think this would be a really good KIP to add to Streams, thank you so much! Guozhang On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar wrote: > +1 on implementing offset based lag for now and push time-based lag to a > later point in time when broker changes are done. Although time-based lag > enhances the readability, it would not be a make or break change for > implementing this KIP. > > Vinoth has explained the role of KeyQueryMetadata, let me in add in my 2 > cents as well. >- There are basically 2 reasons. One is that instead of having two > functions, one to get StreamsMetadata for active and one for replicas. We > are fetching both in a single call and we have a way to get only active or > only replicas from the KeyQueryMetadata object(just like isStandby() and > isActive() discussion we had earlier) >- Since even after fetching the metadata now we have a requirement of > fetching the topicPartition for which the query came:- to fetch lag for > that specific topicPartition. Instead of having another call to fetch the > partition from StreamsMetadataState we thought using one single call and > fetching partition and all metadata would be better. >- Another option was to change StreamsMetadata object and add > topicPartition in that for which the query came but it doesn’t make sense > in terms of semantics as it StreamsMetadata. Also, KeyQueryMetadata > represents all the metadata for the Key being queried, i.e. the partition > it belongs to and the list of StreamsMetadata(hosts) active
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
+1 on implementing offset based lag for now and push time-based lag to a later point in time when broker changes are done. Although time-based lag enhances the readability, it would not be a make or break change for implementing this KIP. Vinoth has explained the role of KeyQueryMetadata, let me in add in my 2 cents as well. - There are basically 2 reasons. One is that instead of having two functions, one to get StreamsMetadata for active and one for replicas. We are fetching both in a single call and we have a way to get only active or only replicas from the KeyQueryMetadata object(just like isStandby() and isActive() discussion we had earlier) - Since even after fetching the metadata now we have a requirement of fetching the topicPartition for which the query came:- to fetch lag for that specific topicPartition. Instead of having another call to fetch the partition from StreamsMetadataState we thought using one single call and fetching partition and all metadata would be better. - Another option was to change StreamsMetadata object and add topicPartition in that for which the query came but it doesn’t make sense in terms of semantics as it StreamsMetadata. Also, KeyQueryMetadata represents all the metadata for the Key being queried, i.e. the partition it belongs to and the list of StreamsMetadata(hosts) active or replica where the key could be found. On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth Chandar wrote: +1 to John, suggestion on Duration/Instant and dropping the API to fetch all store's lags. However, I do think we need to return lags per topic partition. So not sure if single return value would work? We need some new class that holds a TopicPartition and Duration/Instant variables together? 10) Because we needed to return the topicPartition the key belongs to, in order to correlate with the lag information from the other set of APIs. Otherwise, we don't know which topic partition's lag estimate to use. We tried to illustrate this on the example code. StreamsMetadata is simply capturing state of a streams host/instance, where as TopicPartition depends on the key passed in. This is a side effect of our decision to decouple lag based filtering on the metadata apis. 20) Goes back to the previous point. We needed to return information that is key specific, at which point it seemed natural for the KeyQueryMetadata to contain active, standby, topic partition for that key. If we merely returned a standbyMetadataForKey() -> Collection standby, an active metadataForKey() -> StreamsMetadata, and new getTopicPartition(key) -> topicPartition object back to the caller, then arguably you could do the same kind of correlation. IMO having a the KeyQueryMetadata class to encapsulate all this is a friendlier API. allStandbyMetadata() and allStandbyMetadataForStore() are just counter parts for metadataForStore() and allMetadata() that we introduce mostly for consistent API semantics. (their presence implicitly could help denote metadataForStore() is for active instances. Happy to drop them if their utility is not clear) 30) This would assume we refresh all the standby lag information every time we query for that StreamsMetadata for a specific store? For time based lag, this will involve fetching the tail kafka record at once from multiple kafka topic partitions? I would prefer not to couple them like this and have the ability to make granular store (or even topic partition level) fetches for lag information. 32) I actually prefer John's suggestion to let the application drive the lag fetches/updation and not have flags as the KIP current points to. Are you reexamining that position? On fetching lag information, +1 we could do this much more efficiently with a broker changes. Given I don't yet have a burning need for the time based lag, I think we can sequence the APIs such that the offset based ones are implemented first, while we have a broker side change? Given we decoupled the offset and time based lag API, I am willing to drop the time based lag functionality (since its not needed right away for my use-case). @navinder . thoughts? On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax wrote: > Navinder, > > thanks for updating the KIP. Couple of follow up questions: > > > (10) Why do we need to introduce the class `KeyQueryMetadata`? > > (20) Why do we introduce the two methods `allMetadataForKey()`? Would it > not be simpler to add `Collection > standbyMetadataForKey(...)`. This would align with new methods > `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`? > > (30) Why do we need the class `StoreLagInfo` -- it seems simpler to just > extend `StreamMetadata` with the corresponding attributes and methods > (of active task, the lag would always be reported as zero) > > (32) Via (30) we can avoid the two new methods `#allLagInfo()` and > `#lagInfoForStore()`, too, reducing public API and making it simpler to > use the feature. > > Btw: If
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
+1 to John, suggestion on Duration/Instant and dropping the API to fetch all store's lags. However, I do think we need to return lags per topic partition. So not sure if single return value would work? We need some new class that holds a TopicPartition and Duration/Instant variables together? 10) Because we needed to return the topicPartition the key belongs to, in order to correlate with the lag information from the other set of APIs. Otherwise, we don't know which topic partition's lag estimate to use. We tried to illustrate this on the example code. StreamsMetadata is simply capturing state of a streams host/instance, where as TopicPartition depends on the key passed in. This is a side effect of our decision to decouple lag based filtering on the metadata apis. 20) Goes back to the previous point. We needed to return information that is key specific, at which point it seemed natural for the KeyQueryMetadata to contain active, standby, topic partition for that key. If we merely returned a standbyMetadataForKey() -> Collection standby, an active metadataForKey() -> StreamsMetadata, and new getTopicPartition(key) -> topicPartition object back to the caller, then arguably you could do the same kind of correlation. IMO having a the KeyQueryMetadata class to encapsulate all this is a friendlier API. allStandbyMetadata() and allStandbyMetadataForStore() are just counter parts for metadataForStore() and allMetadata() that we introduce mostly for consistent API semantics. (their presence implicitly could help denote metadataForStore() is for active instances. Happy to drop them if their utility is not clear) 30) This would assume we refresh all the standby lag information every time we query for that StreamsMetadata for a specific store? For time based lag, this will involve fetching the tail kafka record at once from multiple kafka topic partitions? I would prefer not to couple them like this and have the ability to make granular store (or even topic partition level) fetches for lag information. 32) I actually prefer John's suggestion to let the application drive the lag fetches/updation and not have flags as the KIP current points to. Are you reexamining that position? On fetching lag information, +1 we could do this much more efficiently with a broker changes. Given I don't yet have a burning need for the time based lag, I think we can sequence the APIs such that the offset based ones are implemented first, while we have a broker side change? Given we decoupled the offset and time based lag API, I am willing to drop the time based lag functionality (since its not needed right away for my use-case). @navinder . thoughts? On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax wrote: > Navinder, > > thanks for updating the KIP. Couple of follow up questions: > > > (10) Why do we need to introduce the class `KeyQueryMetadata`? > > (20) Why do we introduce the two methods `allMetadataForKey()`? Would it > not be simpler to add `Collection > standbyMetadataForKey(...)`. This would align with new methods > `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`? > > (30) Why do we need the class `StoreLagInfo` -- it seems simpler to just > extend `StreamMetadata` with the corresponding attributes and methods > (of active task, the lag would always be reported as zero) > > (32) Via (30) we can avoid the two new methods `#allLagInfo()` and > `#lagInfoForStore()`, too, reducing public API and making it simpler to > use the feature. > > Btw: If we make `StreamMetadata` thread safe, the lag information can be > updated in the background without the need that the application > refreshes its metadata. Hence, the user can get active and/or standby > metadata once, and only needs to refresh it, if a rebalance happened. > > > About point (4) of the previous thread: I was also thinking about > when/how to update the time-lag information, and I agree that we should > not update it for each query. > > "How": That we need to fetch the last record is a little bit > unfortunate, but I don't see any other way without a broker change. One > issue I still see is with "exactly-once" -- if transaction markers are > in the topic, the last message is not at offset "endOffset - 1" and as > multiple transaction markers might be after each other, it's unclear how > to identify the offset of the last record... Thoughts? > > Hence, it might be worth to look into a broker change as a potential > future improvement. It might be possible that the broker caches the > latest timestamp per partition to serve this data efficiently, similar > to `#endOffset()`. > > "When": We refresh the end-offset information based on the > `commit.interval.ms` -- doing it more often is not really useful, as > state store caches will most likely buffer up all writes to changelogs > anyway and are only flushed on commit (including a flush of the > producer). Hence, I would suggest to update the time-lag information > based on the same strategy in the
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Navinder, thanks for updating the KIP. Couple of follow up questions: (10) Why do we need to introduce the class `KeyQueryMetadata`? (20) Why do we introduce the two methods `allMetadataForKey()`? Would it not be simpler to add `Collection standbyMetadataForKey(...)`. This would align with new methods `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`? (30) Why do we need the class `StoreLagInfo` -- it seems simpler to just extend `StreamMetadata` with the corresponding attributes and methods (of active task, the lag would always be reported as zero) (32) Via (30) we can avoid the two new methods `#allLagInfo()` and `#lagInfoForStore()`, too, reducing public API and making it simpler to use the feature. Btw: If we make `StreamMetadata` thread safe, the lag information can be updated in the background without the need that the application refreshes its metadata. Hence, the user can get active and/or standby metadata once, and only needs to refresh it, if a rebalance happened. About point (4) of the previous thread: I was also thinking about when/how to update the time-lag information, and I agree that we should not update it for each query. "How": That we need to fetch the last record is a little bit unfortunate, but I don't see any other way without a broker change. One issue I still see is with "exactly-once" -- if transaction markers are in the topic, the last message is not at offset "endOffset - 1" and as multiple transaction markers might be after each other, it's unclear how to identify the offset of the last record... Thoughts? Hence, it might be worth to look into a broker change as a potential future improvement. It might be possible that the broker caches the latest timestamp per partition to serve this data efficiently, similar to `#endOffset()`. "When": We refresh the end-offset information based on the `commit.interval.ms` -- doing it more often is not really useful, as state store caches will most likely buffer up all writes to changelogs anyway and are only flushed on commit (including a flush of the producer). Hence, I would suggest to update the time-lag information based on the same strategy in the background. This way there is no additional config or methods and the user does not need to worry about it at all. To avoid refresh overhead if we don't need it (a user might not use IQ to begin with), it might be worth to maintain an internal flag `updateTimeLagEnabled` that is set to `false` initially and only set to `true` on the first call of a user to get standby-metadata. -Matthias On 11/4/19 5:13 PM, Vinoth Chandar wrote: >>> I'm having some trouble wrapping my head around what race conditions > might occur, other than the fundamentally broken state in which different > instances are running totally different topologies. > 3. @both Without the topic partitions that the tasks can map back to, we > have to rely on topology/cluster metadata in each Streams instance to map > the task back. If the source topics are wild carded for e,g then each > instance could have different source topics in topology, until the next > rebalance happens. You can also read my comments from here > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > > >>> seems hard to imagine how encoding arbitrarily long topic names plus an > integer for the partition number could be as efficient as task ids, which > are just two integers. > 3. if you still have concerns about the efficacy of dictionary encoding, > happy to engage. The link above also has some benchmark code I used. > Theoretically, we would send each topic name atleast once, so yes if you > compare a 10-20 character topic name + an integer to two integers, it will > be more bytes. But its constant overhead proportional to size of topic name > and with 4,8,12, partitions the size difference between baseline (version 4 > where we just repeated topic names for each topic partition) and the two > approaches becomes narrow. > >>> Plus, Navinder is going to implement a bunch of protocol code that we > might just want to change when the discussion actually does take place, if > ever. >>> it'll just be a mental burden for everyone to remember that we want to > have this follow-up discussion. > 3. Is n't people changing same parts of code and tracking follow ups a > common thing, we need to deal with anyway? For this KIP, is n't it enough > to reason about whether the additional map on top of the topic dictionary > would incur more overhead than the sending task_ids? I don't think it's > case, both of them send two integers. As I see it, we can do a separate > follow up to (re)pursue the task_id conversion and get it working for both > maps within the next release? > >>> Can you elaborate on "breaking up the API"? It looks like there are > already separate API calls in the proposal, one for time-lag, and another > for
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks John and Vinoth for converging thoughts on AssignmentInfo. - Report the time difference between the last consumed changelog record's timestamp and the changelog tail record's timestamp. This is an indicator of how fresh the local copy of a store is with respect to the active copy. Always Duration.ZERO for stores in active tasks >> I think for restoring active tasks this could still be non-zero. - I agree if there is no use case for allLagInfo() maybe it's not needed at all. Regards,Navinder On Wednesday, 6 November, 2019, 09:00:39 am IST, John Roesler wrote: Hey Vinoth, Really sorry, I just remembered that I started a reply earlier today, but got side-tracked. Regarding the AssignmentInfo extension: Your explanation for this point makes sense. I was incorrectly thinking that the cluster metadata was shared with all members, but now I see it's only given to the assignor. I agree now that the assignor basically has to encode this information in the userdata field if it wants the members to have it. Thanks for your patience in explaining and linking the relevant history. Given this constraint, the encoding part of the discussion is moot. Regardless, the detail you provided does make sense to me. I'm now in favor of the proposal for extending AssignmentInfo. Regarding "breaking up the API": Ah, my mistake. Yes, it sounds like this would be a good idea. I just took another look at KafkaStreams. Since there's no method for getting all the local stores, perhaps we can skip the "get the lags for all stores" method, and just add two new methods to the KafkaStreams interface like this: KafkaStreams { // existing T store(String storeName, QueryableStoreType queryableStoreType) // new /* Report the current amount by which the local store lags behind the changelog tail. This is an indicator of how fresh the local copy of a store is with respect to the active copy. Always 0 for stores in active tasks. */ long storeChangelogOffsetLag(String storeName) /* Report the time difference between the last consumed changelog record's timestamp and the changelog tail record's timestamp. This is an indicator of how fresh the local copy of a store is with respect to the active copy. Always Duration.ZERO for stores in active tasks. */ Duration storeChangelogTimeLag(String storeName) } Note, I'm not insisting on this interface, just proposing it to potentially minimize back-and-forth. Here's the reasoning: * Since this API is no longer reporting lags for all stores, just local ones, it makes sense to try and stick close to the `store(name, type)` method. This also brings the new methods down to two. If others think there's a use case for getting all the stores' lags, then we can also propose to add corresponding `all*Lags` methods that return `Map`. * I also just realized that we were proposing to add `timeLagEstimateMs()` as a `long`, but as a project, we have a larger evolution to migrate to `Duration` and `Instant` where applicable. I think it makes sense to do that in this case. How does this sound? Thanks, -John On Tue, Nov 5, 2019 at 7:50 PM Vinoth Chandar wrote: > > Ping :) Any thoughts? > > On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar wrote: > > > >> I'm having some trouble wrapping my head around what race conditions > > might occur, other than the fundamentally broken state in which different > > instances are running totally different topologies. > > 3. @both Without the topic partitions that the tasks can map back to, we > > have to rely on topology/cluster metadata in each Streams instance to map > > the task back. If the source topics are wild carded for e,g then each > > instance could have different source topics in topology, until the next > > rebalance happens. You can also read my comments from here > > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > > > > > > >> seems hard to imagine how encoding arbitrarily long topic names plus an > > integer for the partition number could be as efficient as task ids, which > > are just two integers. > > 3. if you still have concerns about the efficacy of dictionary encoding, > > happy to engage. The link above also has some benchmark code I used. > > Theoretically, we would send each topic name atleast once, so yes if you > > compare a 10-20 character topic name + an integer to two integers, it will > > be more bytes. But its constant overhead proportional to size of topic name > > and with 4,8,12, partitions the size difference between baseline (version 4 > > where we just repeated topic names for each topic partition) and the two > > approaches becomes narrow. > > > > >>Plus, Navinder is going to implement a bunch of protocol code that we > > might just want to change when the discussion actually does take place, if > > ever. > > >>it'll just be a mental burden for everyone to remember that we want
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hey Vinoth, Really sorry, I just remembered that I started a reply earlier today, but got side-tracked. Regarding the AssignmentInfo extension: Your explanation for this point makes sense. I was incorrectly thinking that the cluster metadata was shared with all members, but now I see it's only given to the assignor. I agree now that the assignor basically has to encode this information in the userdata field if it wants the members to have it. Thanks for your patience in explaining and linking the relevant history. Given this constraint, the encoding part of the discussion is moot. Regardless, the detail you provided does make sense to me. I'm now in favor of the proposal for extending AssignmentInfo. Regarding "breaking up the API": Ah, my mistake. Yes, it sounds like this would be a good idea. I just took another look at KafkaStreams. Since there's no method for getting all the local stores, perhaps we can skip the "get the lags for all stores" method, and just add two new methods to the KafkaStreams interface like this: KafkaStreams { // existing T store(String storeName, QueryableStoreType queryableStoreType) // new /* Report the current amount by which the local store lags behind the changelog tail. This is an indicator of how fresh the local copy of a store is with respect to the active copy. Always 0 for stores in active tasks. */ long storeChangelogOffsetLag(String storeName) /* Report the time difference between the last consumed changelog record's timestamp and the changelog tail record's timestamp. This is an indicator of how fresh the local copy of a store is with respect to the active copy. Always Duration.ZERO for stores in active tasks. */ Duration storeChangelogTimeLag(String storeName) } Note, I'm not insisting on this interface, just proposing it to potentially minimize back-and-forth. Here's the reasoning: * Since this API is no longer reporting lags for all stores, just local ones, it makes sense to try and stick close to the `store(name, type)` method. This also brings the new methods down to two. If others think there's a use case for getting all the stores' lags, then we can also propose to add corresponding `all*Lags` methods that return `Map`. * I also just realized that we were proposing to add `timeLagEstimateMs()` as a `long`, but as a project, we have a larger evolution to migrate to `Duration` and `Instant` where applicable. I think it makes sense to do that in this case. How does this sound? Thanks, -John On Tue, Nov 5, 2019 at 7:50 PM Vinoth Chandar wrote: > > Ping :) Any thoughts? > > On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar wrote: > > > >> I'm having some trouble wrapping my head around what race conditions > > might occur, other than the fundamentally broken state in which different > > instances are running totally different topologies. > > 3. @both Without the topic partitions that the tasks can map back to, we > > have to rely on topology/cluster metadata in each Streams instance to map > > the task back. If the source topics are wild carded for e,g then each > > instance could have different source topics in topology, until the next > > rebalance happens. You can also read my comments from here > > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > > > > > > >> seems hard to imagine how encoding arbitrarily long topic names plus an > > integer for the partition number could be as efficient as task ids, which > > are just two integers. > > 3. if you still have concerns about the efficacy of dictionary encoding, > > happy to engage. The link above also has some benchmark code I used. > > Theoretically, we would send each topic name atleast once, so yes if you > > compare a 10-20 character topic name + an integer to two integers, it will > > be more bytes. But its constant overhead proportional to size of topic name > > and with 4,8,12, partitions the size difference between baseline (version 4 > > where we just repeated topic names for each topic partition) and the two > > approaches becomes narrow. > > > > >>Plus, Navinder is going to implement a bunch of protocol code that we > > might just want to change when the discussion actually does take place, if > > ever. > > >>it'll just be a mental burden for everyone to remember that we want to > > have this follow-up discussion. > > 3. Is n't people changing same parts of code and tracking follow ups a > > common thing, we need to deal with anyway? For this KIP, is n't it enough > > to reason about whether the additional map on top of the topic dictionary > > would incur more overhead than the sending task_ids? I don't think it's > > case, both of them send two integers. As I see it, we can do a separate > > follow up to (re)pursue the task_id conversion and get it working for both > > maps within the next release? > > > > >>Can you elaborate on "breaking up the API"? It looks like there
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Ping :) Any thoughts? On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar wrote: > >> I'm having some trouble wrapping my head around what race conditions > might occur, other than the fundamentally broken state in which different > instances are running totally different topologies. > 3. @both Without the topic partitions that the tasks can map back to, we > have to rely on topology/cluster metadata in each Streams instance to map > the task back. If the source topics are wild carded for e,g then each > instance could have different source topics in topology, until the next > rebalance happens. You can also read my comments from here > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > > > >> seems hard to imagine how encoding arbitrarily long topic names plus an > integer for the partition number could be as efficient as task ids, which > are just two integers. > 3. if you still have concerns about the efficacy of dictionary encoding, > happy to engage. The link above also has some benchmark code I used. > Theoretically, we would send each topic name atleast once, so yes if you > compare a 10-20 character topic name + an integer to two integers, it will > be more bytes. But its constant overhead proportional to size of topic name > and with 4,8,12, partitions the size difference between baseline (version 4 > where we just repeated topic names for each topic partition) and the two > approaches becomes narrow. > > >>Plus, Navinder is going to implement a bunch of protocol code that we > might just want to change when the discussion actually does take place, if > ever. > >>it'll just be a mental burden for everyone to remember that we want to > have this follow-up discussion. > 3. Is n't people changing same parts of code and tracking follow ups a > common thing, we need to deal with anyway? For this KIP, is n't it enough > to reason about whether the additional map on top of the topic dictionary > would incur more overhead than the sending task_ids? I don't think it's > case, both of them send two integers. As I see it, we can do a separate > follow up to (re)pursue the task_id conversion and get it working for both > maps within the next release? > > >>Can you elaborate on "breaking up the API"? It looks like there are > already separate API calls in the proposal, one for time-lag, and another > for offset-lag, so are they not already broken up? > The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo > objects which has both time and offset lags. If we had separate APIs, say > (e.g offsetLagForStore(), timeLagForStore()), we can implement offset > version using the offset lag that the streams instance already tracks i.e > no need for external calls. The time based lag API would incur the kafka > read for the timestamp. makes sense? > > Based on the discussions so far, I only see these two pending issues to be > aligned on. Is there any other open item people want to bring up? > > On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman > wrote: > >> Regarding 3) I'm wondering, does your concern still apply even now >> that the pluggable PartitionGrouper interface has been deprecated? >> Now that we can be sure that the DefaultPartitionGrouper is used to >> generate >> the taskId -> partitions mapping, we should be able to convert any taskId >> to any >> partitions. >> >> On Mon, Nov 4, 2019 at 11:17 AM John Roesler wrote: >> >> > Hey Vinoth, thanks for the reply! >> > >> > 3. >> > I get that it's not the main focus of this KIP, but if it's ok, it >> > would be nice to hash out this point right now. It only came up >> > because this KIP-535 is substantially extending the pattern in >> > question. If we push it off until later, then the reviewers are going >> > to have to suspend their concerns not just while voting for the KIP, >> > but also while reviewing the code. Plus, Navinder is going to >> > implement a bunch of protocol code that we might just want to change >> > when the discussion actually does take place, if ever. Finally, it'll >> > just be a mental burden for everyone to remember that we want to have >> > this follow-up discussion. >> > >> > It makes sense what you say... the specific assignment is already >> > encoded in the "main" portion of the assignment, not in the "userdata" >> > part. It also makes sense that it's simpler to reason about races if >> > you simply get all the information about the topics and partitions >> > directly from the assignor, rather than get the partition number from >> > the assignor and the topic name from your own a priori knowledge of >> > the topology. On the other hand, I'm having some trouble wrapping my >> > head around what race conditions might occur, other than the >> > fundamentally broken state in which different instances are running >> > totally different topologies. Sorry, but can you remind us of the >> > specific condition? >>
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
>> I'm having some trouble wrapping my head around what race conditions might occur, other than the fundamentally broken state in which different instances are running totally different topologies. 3. @both Without the topic partitions that the tasks can map back to, we have to rely on topology/cluster metadata in each Streams instance to map the task back. If the source topics are wild carded for e,g then each instance could have different source topics in topology, until the next rebalance happens. You can also read my comments from here https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 >> seems hard to imagine how encoding arbitrarily long topic names plus an integer for the partition number could be as efficient as task ids, which are just two integers. 3. if you still have concerns about the efficacy of dictionary encoding, happy to engage. The link above also has some benchmark code I used. Theoretically, we would send each topic name atleast once, so yes if you compare a 10-20 character topic name + an integer to two integers, it will be more bytes. But its constant overhead proportional to size of topic name and with 4,8,12, partitions the size difference between baseline (version 4 where we just repeated topic names for each topic partition) and the two approaches becomes narrow. >>Plus, Navinder is going to implement a bunch of protocol code that we might just want to change when the discussion actually does take place, if ever. >>it'll just be a mental burden for everyone to remember that we want to have this follow-up discussion. 3. Is n't people changing same parts of code and tracking follow ups a common thing, we need to deal with anyway? For this KIP, is n't it enough to reason about whether the additional map on top of the topic dictionary would incur more overhead than the sending task_ids? I don't think it's case, both of them send two integers. As I see it, we can do a separate follow up to (re)pursue the task_id conversion and get it working for both maps within the next release? >>Can you elaborate on "breaking up the API"? It looks like there are already separate API calls in the proposal, one for time-lag, and another for offset-lag, so are they not already broken up? The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo objects which has both time and offset lags. If we had separate APIs, say (e.g offsetLagForStore(), timeLagForStore()), we can implement offset version using the offset lag that the streams instance already tracks i.e no need for external calls. The time based lag API would incur the kafka read for the timestamp. makes sense? Based on the discussions so far, I only see these two pending issues to be aligned on. Is there any other open item people want to bring up? On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman wrote: > Regarding 3) I'm wondering, does your concern still apply even now > that the pluggable PartitionGrouper interface has been deprecated? > Now that we can be sure that the DefaultPartitionGrouper is used to > generate > the taskId -> partitions mapping, we should be able to convert any taskId > to any > partitions. > > On Mon, Nov 4, 2019 at 11:17 AM John Roesler wrote: > > > Hey Vinoth, thanks for the reply! > > > > 3. > > I get that it's not the main focus of this KIP, but if it's ok, it > > would be nice to hash out this point right now. It only came up > > because this KIP-535 is substantially extending the pattern in > > question. If we push it off until later, then the reviewers are going > > to have to suspend their concerns not just while voting for the KIP, > > but also while reviewing the code. Plus, Navinder is going to > > implement a bunch of protocol code that we might just want to change > > when the discussion actually does take place, if ever. Finally, it'll > > just be a mental burden for everyone to remember that we want to have > > this follow-up discussion. > > > > It makes sense what you say... the specific assignment is already > > encoded in the "main" portion of the assignment, not in the "userdata" > > part. It also makes sense that it's simpler to reason about races if > > you simply get all the information about the topics and partitions > > directly from the assignor, rather than get the partition number from > > the assignor and the topic name from your own a priori knowledge of > > the topology. On the other hand, I'm having some trouble wrapping my > > head around what race conditions might occur, other than the > > fundamentally broken state in which different instances are running > > totally different topologies. Sorry, but can you remind us of the > > specific condition? > > > > To the efficiency counterargument, it seems hard to imagine how > > encoding arbitrarily long topic names plus an integer for the > > partition number could be as efficient as task ids, which are just two
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Regarding 3) I'm wondering, does your concern still apply even now that the pluggable PartitionGrouper interface has been deprecated? Now that we can be sure that the DefaultPartitionGrouper is used to generate the taskId -> partitions mapping, we should be able to convert any taskId to any partitions. On Mon, Nov 4, 2019 at 11:17 AM John Roesler wrote: > Hey Vinoth, thanks for the reply! > > 3. > I get that it's not the main focus of this KIP, but if it's ok, it > would be nice to hash out this point right now. It only came up > because this KIP-535 is substantially extending the pattern in > question. If we push it off until later, then the reviewers are going > to have to suspend their concerns not just while voting for the KIP, > but also while reviewing the code. Plus, Navinder is going to > implement a bunch of protocol code that we might just want to change > when the discussion actually does take place, if ever. Finally, it'll > just be a mental burden for everyone to remember that we want to have > this follow-up discussion. > > It makes sense what you say... the specific assignment is already > encoded in the "main" portion of the assignment, not in the "userdata" > part. It also makes sense that it's simpler to reason about races if > you simply get all the information about the topics and partitions > directly from the assignor, rather than get the partition number from > the assignor and the topic name from your own a priori knowledge of > the topology. On the other hand, I'm having some trouble wrapping my > head around what race conditions might occur, other than the > fundamentally broken state in which different instances are running > totally different topologies. Sorry, but can you remind us of the > specific condition? > > To the efficiency counterargument, it seems hard to imagine how > encoding arbitrarily long topic names plus an integer for the > partition number could be as efficient as task ids, which are just two > integers. It seems like this would only be true if topic names were 4 > characters or less. > > 4. > Yeah, clearly, it would not be a good idea to query the metadata > before every single IQ query. I think there are plenty of established > patterns for distributed database clients to follow. Can you elaborate > on "breaking up the API"? It looks like there are already separate API > calls in the proposal, one for time-lag, and another for offset-lag, > so are they not already broken up? FWIW, yes, I agree, the offset lag > is already locally known, so we don't need to build in an extra > synchronous broker API call, just one for the time-lag. > > Thanks again for the discussion, > -John > > On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar > wrote: > > > > 3. Right now, we still get the topic partitions assigned as a part of the > > top level Assignment object (the one that wraps AssignmentInfo) and use > > that to convert taskIds back. This list of only contains assignments for > > that particular instance. Attempting to also reverse map for "all" the > > tasksIds in the streams cluster i.e all the topic partitions in these > > global assignment maps was what was problematic. By explicitly sending > the > > global assignment maps as actual topic partitions, group coordinator > (i.e > > the leader that computes the assignment's ) is able to consistently > enforce > > its view of the topic metadata. Still don't think doing such a change > that > > forces you to reconsider semantics, is not needed to save bits on wire. > May > > be we can discuss this separately from this KIP? > > > > 4. There needs to be some caching/interval somewhere though since we > don't > > want to make 1 kafka read per 1 IQ potentially. But I think its a valid > > suggestion, to make this call just synchronous and leave the caching or > how > > often you want to call to the application. Would it be good to then break > > up the APIs for time and offset based lag? We can obtain offset based > lag > > for free? Only incur the overhead of reading kafka if we want time > > based lags? > > > > On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman > > wrote: > > > > > Adding on to John's response to 3), can you clarify when and why > exactly we > > > cannot > > > convert between taskIds and partitions? If that's really the case I > don't > > > feel confident > > > that the StreamsPartitionAssignor is not full of bugs... > > > > > > It seems like it currently just encodes a list of all partitions (the > > > assignment) and also > > > a list of the corresponding task ids, duplicated to ensure each > partition > > > has the corresponding > > > taskId at the same offset into the list. Why is that problematic? > > > > > > > > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler > wrote: > > > > > > > Thanks, all, for considering the points! > > > > > > > > 3. Interesting. I have a vague recollection of that... Still, though, > > > > it seems a little fishy. After all, we return the assignments > > > >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hey Vinoth, thanks for the reply! 3. I get that it's not the main focus of this KIP, but if it's ok, it would be nice to hash out this point right now. It only came up because this KIP-535 is substantially extending the pattern in question. If we push it off until later, then the reviewers are going to have to suspend their concerns not just while voting for the KIP, but also while reviewing the code. Plus, Navinder is going to implement a bunch of protocol code that we might just want to change when the discussion actually does take place, if ever. Finally, it'll just be a mental burden for everyone to remember that we want to have this follow-up discussion. It makes sense what you say... the specific assignment is already encoded in the "main" portion of the assignment, not in the "userdata" part. It also makes sense that it's simpler to reason about races if you simply get all the information about the topics and partitions directly from the assignor, rather than get the partition number from the assignor and the topic name from your own a priori knowledge of the topology. On the other hand, I'm having some trouble wrapping my head around what race conditions might occur, other than the fundamentally broken state in which different instances are running totally different topologies. Sorry, but can you remind us of the specific condition? To the efficiency counterargument, it seems hard to imagine how encoding arbitrarily long topic names plus an integer for the partition number could be as efficient as task ids, which are just two integers. It seems like this would only be true if topic names were 4 characters or less. 4. Yeah, clearly, it would not be a good idea to query the metadata before every single IQ query. I think there are plenty of established patterns for distributed database clients to follow. Can you elaborate on "breaking up the API"? It looks like there are already separate API calls in the proposal, one for time-lag, and another for offset-lag, so are they not already broken up? FWIW, yes, I agree, the offset lag is already locally known, so we don't need to build in an extra synchronous broker API call, just one for the time-lag. Thanks again for the discussion, -John On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar wrote: > > 3. Right now, we still get the topic partitions assigned as a part of the > top level Assignment object (the one that wraps AssignmentInfo) and use > that to convert taskIds back. This list of only contains assignments for > that particular instance. Attempting to also reverse map for "all" the > tasksIds in the streams cluster i.e all the topic partitions in these > global assignment maps was what was problematic. By explicitly sending the > global assignment maps as actual topic partitions, group coordinator (i.e > the leader that computes the assignment's ) is able to consistently enforce > its view of the topic metadata. Still don't think doing such a change that > forces you to reconsider semantics, is not needed to save bits on wire. May > be we can discuss this separately from this KIP? > > 4. There needs to be some caching/interval somewhere though since we don't > want to make 1 kafka read per 1 IQ potentially. But I think its a valid > suggestion, to make this call just synchronous and leave the caching or how > often you want to call to the application. Would it be good to then break > up the APIs for time and offset based lag? We can obtain offset based lag > for free? Only incur the overhead of reading kafka if we want time > based lags? > > On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman > wrote: > > > Adding on to John's response to 3), can you clarify when and why exactly we > > cannot > > convert between taskIds and partitions? If that's really the case I don't > > feel confident > > that the StreamsPartitionAssignor is not full of bugs... > > > > It seems like it currently just encodes a list of all partitions (the > > assignment) and also > > a list of the corresponding task ids, duplicated to ensure each partition > > has the corresponding > > taskId at the same offset into the list. Why is that problematic? > > > > > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler wrote: > > > > > Thanks, all, for considering the points! > > > > > > 3. Interesting. I have a vague recollection of that... Still, though, > > > it seems a little fishy. After all, we return the assignments > > > themselves as task ids, and the members have to map these to topic > > > partitions in order to configure themselves properly. If it's too > > > complicated to get this right, then how do we know that Streams is > > > computing the correct partitions at all? > > > > > > 4. How about just checking the log-end timestamp when you call the > > > method? Then, when you get an answer, it's as fresh as it could > > > possibly be. And as a user you have just one, obvious, "knob" to > > > configure how much overhead you want to devote to checking... If you > > > want to
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
3. Right now, we still get the topic partitions assigned as a part of the top level Assignment object (the one that wraps AssignmentInfo) and use that to convert taskIds back. This list of only contains assignments for that particular instance. Attempting to also reverse map for "all" the tasksIds in the streams cluster i.e all the topic partitions in these global assignment maps was what was problematic. By explicitly sending the global assignment maps as actual topic partitions, group coordinator (i.e the leader that computes the assignment's ) is able to consistently enforce its view of the topic metadata. Still don't think doing such a change that forces you to reconsider semantics, is not needed to save bits on wire. May be we can discuss this separately from this KIP? 4. There needs to be some caching/interval somewhere though since we don't want to make 1 kafka read per 1 IQ potentially. But I think its a valid suggestion, to make this call just synchronous and leave the caching or how often you want to call to the application. Would it be good to then break up the APIs for time and offset based lag? We can obtain offset based lag for free? Only incur the overhead of reading kafka if we want time based lags? On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman wrote: > Adding on to John's response to 3), can you clarify when and why exactly we > cannot > convert between taskIds and partitions? If that's really the case I don't > feel confident > that the StreamsPartitionAssignor is not full of bugs... > > It seems like it currently just encodes a list of all partitions (the > assignment) and also > a list of the corresponding task ids, duplicated to ensure each partition > has the corresponding > taskId at the same offset into the list. Why is that problematic? > > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler wrote: > > > Thanks, all, for considering the points! > > > > 3. Interesting. I have a vague recollection of that... Still, though, > > it seems a little fishy. After all, we return the assignments > > themselves as task ids, and the members have to map these to topic > > partitions in order to configure themselves properly. If it's too > > complicated to get this right, then how do we know that Streams is > > computing the correct partitions at all? > > > > 4. How about just checking the log-end timestamp when you call the > > method? Then, when you get an answer, it's as fresh as it could > > possibly be. And as a user you have just one, obvious, "knob" to > > configure how much overhead you want to devote to checking... If you > > want to call the broker API less frequently, you just call the Streams > > API less frequently. And you don't have to worry about the > > relationship between your invocations of that method and the config > > setting (e.g., you'll never get a negative number, which you could if > > you check the log-end timestamp less frequently than you check the > > lag). > > > > Thanks, > > -John > > > > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar > > wrote: > > > > > > Thanks John for going through this. > > > > > >- +1, makes sense > > >- +1, no issues there > > >- Yeah the initial patch I had submitted for K-7149( > > https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo > > object had taskIds but the merged PR had similar size according to Vinoth > > and it was simpler so if the end result is of same size, it would not > make > > sense to pivot from dictionary and again move to taskIDs. > > >- Not sure about what a good default would be if we don't have a > > configurable setting. This gives the users the flexibility to the users > to > > serve their requirements as at the end of the day it would take CPU > cycles. > > I am ok with starting it with a default and see how it goes based upon > > feedback. > > > > > > Thanks, > > > Navinder > > > On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar < > > vchan...@confluent.io> wrote: > > > > > > 1. Was trying to spell them out separately. but makes sense for > > > readability. done > > > > > > 2. No I immediately agree :) .. makes sense. @navinder? > > > > > > 3. I actually attempted only sending taskIds while working on > KAFKA-7149. > > > Its non-trivial to handle edges cases resulting from newly added topic > > > partitions and wildcarded topic entries. I ended up simplifying it to > > just > > > dictionary encoding the topic names to reduce size. We can apply the > same > > > technique here for this map. Additionally, we could also dictionary > > encode > > > HostInfo, given its now repeated twice. I think this would save more > > space > > > than having a flag per topic partition entry. Lmk if you are okay with > > > this. > > > > > > 4. This opens up a good discussion. Given we support time lag estimates > > > also, we need to read the tail record of the changelog periodically > > (unlike > > > offset lag, which we can potentially piggyback on metadata in > > >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Adding on to John's response to 3), can you clarify when and why exactly we cannot convert between taskIds and partitions? If that's really the case I don't feel confident that the StreamsPartitionAssignor is not full of bugs... It seems like it currently just encodes a list of all partitions (the assignment) and also a list of the corresponding task ids, duplicated to ensure each partition has the corresponding taskId at the same offset into the list. Why is that problematic? On Fri, Nov 1, 2019 at 12:39 PM John Roesler wrote: > Thanks, all, for considering the points! > > 3. Interesting. I have a vague recollection of that... Still, though, > it seems a little fishy. After all, we return the assignments > themselves as task ids, and the members have to map these to topic > partitions in order to configure themselves properly. If it's too > complicated to get this right, then how do we know that Streams is > computing the correct partitions at all? > > 4. How about just checking the log-end timestamp when you call the > method? Then, when you get an answer, it's as fresh as it could > possibly be. And as a user you have just one, obvious, "knob" to > configure how much overhead you want to devote to checking... If you > want to call the broker API less frequently, you just call the Streams > API less frequently. And you don't have to worry about the > relationship between your invocations of that method and the config > setting (e.g., you'll never get a negative number, which you could if > you check the log-end timestamp less frequently than you check the > lag). > > Thanks, > -John > > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar > wrote: > > > > Thanks John for going through this. > > > >- +1, makes sense > >- +1, no issues there > >- Yeah the initial patch I had submitted for K-7149( > https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo > object had taskIds but the merged PR had similar size according to Vinoth > and it was simpler so if the end result is of same size, it would not make > sense to pivot from dictionary and again move to taskIDs. > >- Not sure about what a good default would be if we don't have a > configurable setting. This gives the users the flexibility to the users to > serve their requirements as at the end of the day it would take CPU cycles. > I am ok with starting it with a default and see how it goes based upon > feedback. > > > > Thanks, > > Navinder > > On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar < > vchan...@confluent.io> wrote: > > > > 1. Was trying to spell them out separately. but makes sense for > > readability. done > > > > 2. No I immediately agree :) .. makes sense. @navinder? > > > > 3. I actually attempted only sending taskIds while working on KAFKA-7149. > > Its non-trivial to handle edges cases resulting from newly added topic > > partitions and wildcarded topic entries. I ended up simplifying it to > just > > dictionary encoding the topic names to reduce size. We can apply the same > > technique here for this map. Additionally, we could also dictionary > encode > > HostInfo, given its now repeated twice. I think this would save more > space > > than having a flag per topic partition entry. Lmk if you are okay with > > this. > > > > 4. This opens up a good discussion. Given we support time lag estimates > > also, we need to read the tail record of the changelog periodically > (unlike > > offset lag, which we can potentially piggyback on metadata in > > ConsumerRecord IIUC). we thought we should have a config that control how > > often this read happens? Let me know if there is a simple way to get > > timestamp value of the tail record that we are missing. > > > > On Thu, Oct 31, 2019 at 12:58 PM John Roesler wrote: > > > > > Hey Navinder, > > > > > > Thanks for updating the KIP, it's a lot easier to see the current > > > state of the proposal now. > > > > > > A few remarks: > > > 1. I'm sure it was just an artifact of revisions, but you have two > > > separate sections where you list additions to the KafkaStreams > > > interface. Can you consolidate those so we can see all the additions > > > at once? > > > > > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead, > > > to be clearer that we're specifically measuring a number of offsets? > > > If you don't immediately agree, then I'd at least point out that we > > > usually refer to elements of Kafka topics as "records", not > > > "messages", so "recordLagEstimate" might be more appropriate. > > > > > > 3. The proposal mentions adding a map of the standby _partitions_ for > > > each host to AssignmentInfo. I assume this is designed to mirror the > > > existing "partitionsByHost" map. To keep the size of these metadata > > > messages down, maybe we can consider making two changes: > > > (a) for both actives and standbys, encode the _task ids_ instead of > > > _partitions_. Every member of the cluster has a copy of the topology, > > >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks, all, for considering the points! 3. Interesting. I have a vague recollection of that... Still, though, it seems a little fishy. After all, we return the assignments themselves as task ids, and the members have to map these to topic partitions in order to configure themselves properly. If it's too complicated to get this right, then how do we know that Streams is computing the correct partitions at all? 4. How about just checking the log-end timestamp when you call the method? Then, when you get an answer, it's as fresh as it could possibly be. And as a user you have just one, obvious, "knob" to configure how much overhead you want to devote to checking... If you want to call the broker API less frequently, you just call the Streams API less frequently. And you don't have to worry about the relationship between your invocations of that method and the config setting (e.g., you'll never get a negative number, which you could if you check the log-end timestamp less frequently than you check the lag). Thanks, -John On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar wrote: > > Thanks John for going through this. > >- +1, makes sense >- +1, no issues there >- Yeah the initial patch I had submitted for > K-7149(https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo > object had taskIds but the merged PR had similar size according to Vinoth and > it was simpler so if the end result is of same size, it would not make sense > to pivot from dictionary and again move to taskIDs. >- Not sure about what a good default would be if we don't have a > configurable setting. This gives the users the flexibility to the users to > serve their requirements as at the end of the day it would take CPU cycles. I > am ok with starting it with a default and see how it goes based upon feedback. > > Thanks, > Navinder > On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar > wrote: > > 1. Was trying to spell them out separately. but makes sense for > readability. done > > 2. No I immediately agree :) .. makes sense. @navinder? > > 3. I actually attempted only sending taskIds while working on KAFKA-7149. > Its non-trivial to handle edges cases resulting from newly added topic > partitions and wildcarded topic entries. I ended up simplifying it to just > dictionary encoding the topic names to reduce size. We can apply the same > technique here for this map. Additionally, we could also dictionary encode > HostInfo, given its now repeated twice. I think this would save more space > than having a flag per topic partition entry. Lmk if you are okay with > this. > > 4. This opens up a good discussion. Given we support time lag estimates > also, we need to read the tail record of the changelog periodically (unlike > offset lag, which we can potentially piggyback on metadata in > ConsumerRecord IIUC). we thought we should have a config that control how > often this read happens? Let me know if there is a simple way to get > timestamp value of the tail record that we are missing. > > On Thu, Oct 31, 2019 at 12:58 PM John Roesler wrote: > > > Hey Navinder, > > > > Thanks for updating the KIP, it's a lot easier to see the current > > state of the proposal now. > > > > A few remarks: > > 1. I'm sure it was just an artifact of revisions, but you have two > > separate sections where you list additions to the KafkaStreams > > interface. Can you consolidate those so we can see all the additions > > at once? > > > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead, > > to be clearer that we're specifically measuring a number of offsets? > > If you don't immediately agree, then I'd at least point out that we > > usually refer to elements of Kafka topics as "records", not > > "messages", so "recordLagEstimate" might be more appropriate. > > > > 3. The proposal mentions adding a map of the standby _partitions_ for > > each host to AssignmentInfo. I assume this is designed to mirror the > > existing "partitionsByHost" map. To keep the size of these metadata > > messages down, maybe we can consider making two changes: > > (a) for both actives and standbys, encode the _task ids_ instead of > > _partitions_. Every member of the cluster has a copy of the topology, > > so they can convert task ids into specific partitions on their own, > > and task ids are only (usually) three characters. > > (b) instead of encoding two maps (hostinfo -> actives AND hostinfo -> > > standbys), which requires serializing all the hostinfos twice, maybe > > we can pack them together in one map with a structured value (hostinfo > > -> [actives,standbys]). > > Both of these ideas still require bumping the protocol version to 6, > > and they basically mean we drop the existing `PartitionsByHost` field > > and add a new `TasksByHost` field with the structured value I > > mentioned. > > > > 4. Can we avoid adding the new "lag refresh" config? The lags would > > necessarily be approximate anyway, so adding the
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks John for going through this. - +1, makes sense - +1, no issues there - Yeah the initial patch I had submitted for K-7149(https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo object had taskIds but the merged PR had similar size according to Vinoth and it was simpler so if the end result is of same size, it would not make sense to pivot from dictionary and again move to taskIDs. - Not sure about what a good default would be if we don't have a configurable setting. This gives the users the flexibility to the users to serve their requirements as at the end of the day it would take CPU cycles. I am ok with starting it with a default and see how it goes based upon feedback. Thanks, Navinder On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar wrote: 1. Was trying to spell them out separately. but makes sense for readability. done 2. No I immediately agree :) .. makes sense. @navinder? 3. I actually attempted only sending taskIds while working on KAFKA-7149. Its non-trivial to handle edges cases resulting from newly added topic partitions and wildcarded topic entries. I ended up simplifying it to just dictionary encoding the topic names to reduce size. We can apply the same technique here for this map. Additionally, we could also dictionary encode HostInfo, given its now repeated twice. I think this would save more space than having a flag per topic partition entry. Lmk if you are okay with this. 4. This opens up a good discussion. Given we support time lag estimates also, we need to read the tail record of the changelog periodically (unlike offset lag, which we can potentially piggyback on metadata in ConsumerRecord IIUC). we thought we should have a config that control how often this read happens? Let me know if there is a simple way to get timestamp value of the tail record that we are missing. On Thu, Oct 31, 2019 at 12:58 PM John Roesler wrote: > Hey Navinder, > > Thanks for updating the KIP, it's a lot easier to see the current > state of the proposal now. > > A few remarks: > 1. I'm sure it was just an artifact of revisions, but you have two > separate sections where you list additions to the KafkaStreams > interface. Can you consolidate those so we can see all the additions > at once? > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead, > to be clearer that we're specifically measuring a number of offsets? > If you don't immediately agree, then I'd at least point out that we > usually refer to elements of Kafka topics as "records", not > "messages", so "recordLagEstimate" might be more appropriate. > > 3. The proposal mentions adding a map of the standby _partitions_ for > each host to AssignmentInfo. I assume this is designed to mirror the > existing "partitionsByHost" map. To keep the size of these metadata > messages down, maybe we can consider making two changes: > (a) for both actives and standbys, encode the _task ids_ instead of > _partitions_. Every member of the cluster has a copy of the topology, > so they can convert task ids into specific partitions on their own, > and task ids are only (usually) three characters. > (b) instead of encoding two maps (hostinfo -> actives AND hostinfo -> > standbys), which requires serializing all the hostinfos twice, maybe > we can pack them together in one map with a structured value (hostinfo > -> [actives,standbys]). > Both of these ideas still require bumping the protocol version to 6, > and they basically mean we drop the existing `PartitionsByHost` field > and add a new `TasksByHost` field with the structured value I > mentioned. > > 4. Can we avoid adding the new "lag refresh" config? The lags would > necessarily be approximate anyway, so adding the config seems to > increase the operational complexity of the system for little actual > benefit. > > Thanks for the pseudocode, by the way, it really helps visualize how > these new interfaces would play together. And thanks again for the > update! > -John > > On Thu, Oct 31, 2019 at 2:41 PM John Roesler wrote: > > > > Hey Vinoth, > > > > I started going over the KIP again yesterday. There are a lot of > > updates, and I didn't finish my feedback in one day. I'm working on it > > now. > > > > Thanks, > > John > > > > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar > wrote: > > > > > > Wondering if anyone has thoughts on these changes? I liked that the new > > > metadata fetch APIs provide all the information at once with consistent > > > naming.. > > > > > > Any guidance on what you would like to be discussed or fleshed out more > > > before we call a VOTE? > > > > > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > > > wrote: > > > > > > > Hi, > > > > We have made some edits in the KIP( > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > ) > > > > after due deliberation on the agreed design to support the new query > > > > design. This
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
1. Was trying to spell them out separately. but makes sense for readability. done 2. No I immediately agree :) .. makes sense. @navinder? 3. I actually attempted only sending taskIds while working on KAFKA-7149. Its non-trivial to handle edges cases resulting from newly added topic partitions and wildcarded topic entries. I ended up simplifying it to just dictionary encoding the topic names to reduce size. We can apply the same technique here for this map. Additionally, we could also dictionary encode HostInfo, given its now repeated twice. I think this would save more space than having a flag per topic partition entry. Lmk if you are okay with this. 4. This opens up a good discussion. Given we support time lag estimates also, we need to read the tail record of the changelog periodically (unlike offset lag, which we can potentially piggyback on metadata in ConsumerRecord IIUC). we thought we should have a config that control how often this read happens? Let me know if there is a simple way to get timestamp value of the tail record that we are missing. On Thu, Oct 31, 2019 at 12:58 PM John Roesler wrote: > Hey Navinder, > > Thanks for updating the KIP, it's a lot easier to see the current > state of the proposal now. > > A few remarks: > 1. I'm sure it was just an artifact of revisions, but you have two > separate sections where you list additions to the KafkaStreams > interface. Can you consolidate those so we can see all the additions > at once? > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead, > to be clearer that we're specifically measuring a number of offsets? > If you don't immediately agree, then I'd at least point out that we > usually refer to elements of Kafka topics as "records", not > "messages", so "recordLagEstimate" might be more appropriate. > > 3. The proposal mentions adding a map of the standby _partitions_ for > each host to AssignmentInfo. I assume this is designed to mirror the > existing "partitionsByHost" map. To keep the size of these metadata > messages down, maybe we can consider making two changes: > (a) for both actives and standbys, encode the _task ids_ instead of > _partitions_. Every member of the cluster has a copy of the topology, > so they can convert task ids into specific partitions on their own, > and task ids are only (usually) three characters. > (b) instead of encoding two maps (hostinfo -> actives AND hostinfo -> > standbys), which requires serializing all the hostinfos twice, maybe > we can pack them together in one map with a structured value (hostinfo > -> [actives,standbys]). > Both of these ideas still require bumping the protocol version to 6, > and they basically mean we drop the existing `PartitionsByHost` field > and add a new `TasksByHost` field with the structured value I > mentioned. > > 4. Can we avoid adding the new "lag refresh" config? The lags would > necessarily be approximate anyway, so adding the config seems to > increase the operational complexity of the system for little actual > benefit. > > Thanks for the pseudocode, by the way, it really helps visualize how > these new interfaces would play together. And thanks again for the > update! > -John > > On Thu, Oct 31, 2019 at 2:41 PM John Roesler wrote: > > > > Hey Vinoth, > > > > I started going over the KIP again yesterday. There are a lot of > > updates, and I didn't finish my feedback in one day. I'm working on it > > now. > > > > Thanks, > > John > > > > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar > wrote: > > > > > > Wondering if anyone has thoughts on these changes? I liked that the new > > > metadata fetch APIs provide all the information at once with consistent > > > naming.. > > > > > > Any guidance on what you would like to be discussed or fleshed out more > > > before we call a VOTE? > > > > > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > > > wrote: > > > > > > > Hi, > > > > We have made some edits in the KIP( > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > ) > > > > after due deliberation on the agreed design to support the new query > > > > design. This includes the new public API to query offset/time lag > > > > information and other details related to querying standby tasks > which have > > > > come up after thinking of thorough details. > > > > > > > > > > > > > > > >- Addition of new config, “lag.fetch.interval.ms” to configure > the > > > > interval of time/offset lag > > > >- Addition of new class StoreLagInfo to store the periodically > obtained > > > > time/offset lag > > > >- Addition of two new functions in KafkaStreams, > List > > > > allLagInfo() and List lagInfoForStore(String > storeName) to > > > > return the lag information for an instance and a store respectively > > > >- Addition of new class KeyQueryMetadata. We need topicPartition > for > > > > each key to be matched with the lag API for the topic partition. One > way is >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hey Navinder, Thanks for updating the KIP, it's a lot easier to see the current state of the proposal now. A few remarks: 1. I'm sure it was just an artifact of revisions, but you have two separate sections where you list additions to the KafkaStreams interface. Can you consolidate those so we can see all the additions at once? 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead, to be clearer that we're specifically measuring a number of offsets? If you don't immediately agree, then I'd at least point out that we usually refer to elements of Kafka topics as "records", not "messages", so "recordLagEstimate" might be more appropriate. 3. The proposal mentions adding a map of the standby _partitions_ for each host to AssignmentInfo. I assume this is designed to mirror the existing "partitionsByHost" map. To keep the size of these metadata messages down, maybe we can consider making two changes: (a) for both actives and standbys, encode the _task ids_ instead of _partitions_. Every member of the cluster has a copy of the topology, so they can convert task ids into specific partitions on their own, and task ids are only (usually) three characters. (b) instead of encoding two maps (hostinfo -> actives AND hostinfo -> standbys), which requires serializing all the hostinfos twice, maybe we can pack them together in one map with a structured value (hostinfo -> [actives,standbys]). Both of these ideas still require bumping the protocol version to 6, and they basically mean we drop the existing `PartitionsByHost` field and add a new `TasksByHost` field with the structured value I mentioned. 4. Can we avoid adding the new "lag refresh" config? The lags would necessarily be approximate anyway, so adding the config seems to increase the operational complexity of the system for little actual benefit. Thanks for the pseudocode, by the way, it really helps visualize how these new interfaces would play together. And thanks again for the update! -John On Thu, Oct 31, 2019 at 2:41 PM John Roesler wrote: > > Hey Vinoth, > > I started going over the KIP again yesterday. There are a lot of > updates, and I didn't finish my feedback in one day. I'm working on it > now. > > Thanks, > John > > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar wrote: > > > > Wondering if anyone has thoughts on these changes? I liked that the new > > metadata fetch APIs provide all the information at once with consistent > > naming.. > > > > Any guidance on what you would like to be discussed or fleshed out more > > before we call a VOTE? > > > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > > wrote: > > > > > Hi, > > > We have made some edits in the KIP( > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance) > > > after due deliberation on the agreed design to support the new query > > > design. This includes the new public API to query offset/time lag > > > information and other details related to querying standby tasks which have > > > come up after thinking of thorough details. > > > > > > > > > > > >- Addition of new config, “lag.fetch.interval.ms” to configure the > > > interval of time/offset lag > > >- Addition of new class StoreLagInfo to store the periodically obtained > > > time/offset lag > > >- Addition of two new functions in KafkaStreams, List > > > allLagInfo() and List lagInfoForStore(String storeName) to > > > return the lag information for an instance and a store respectively > > >- Addition of new class KeyQueryMetadata. We need topicPartition for > > > each key to be matched with the lag API for the topic partition. One way > > > is > > > to add new functions and fetch topicPartition from StreamsMetadataState > > > but > > > we thought having one call and fetching StreamsMetadata and topicPartition > > > is more cleaner. > > >- > > > Renaming partitionsForHost to activePartitionsForHost in > > > StreamsMetadataState > > > and partitionsByHostState to activePartitionsByHostState > > > in StreamsPartitionAssignor > > >- We have also added the pseudo code of how all the changes will exist > > > together and support the new querying APIs > > > > > > Please let me know if anything is pending now, before a vote can be > > > started on this. On Saturday, 26 October, 2019, 05:41:44 pm IST, > > > Navinder > > > Brar wrote: > > > > > > >> Since there are two soft votes for separate active/standby API > > > methods, I also change my position on that. Fine with 2 separate > > > methods. Once we remove the lag information from these APIs, returning a > > > List is less attractive, since the ordering has no special meaning now. > > > Agreed, now that we are not returning lag, I am also sold on having two > > > separate functions. We already have one which returns streamsMetadata for > > > active tasks, and now we can add another one for standbys. > > > > > > > > > > > > On Saturday, 26 October, 2019, 03:55:16
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hey Vinoth, I started going over the KIP again yesterday. There are a lot of updates, and I didn't finish my feedback in one day. I'm working on it now. Thanks, John On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar wrote: > > Wondering if anyone has thoughts on these changes? I liked that the new > metadata fetch APIs provide all the information at once with consistent > naming.. > > Any guidance on what you would like to be discussed or fleshed out more > before we call a VOTE? > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > wrote: > > > Hi, > > We have made some edits in the KIP( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance) > > after due deliberation on the agreed design to support the new query > > design. This includes the new public API to query offset/time lag > > information and other details related to querying standby tasks which have > > come up after thinking of thorough details. > > > > > > > >- Addition of new config, “lag.fetch.interval.ms” to configure the > > interval of time/offset lag > >- Addition of new class StoreLagInfo to store the periodically obtained > > time/offset lag > >- Addition of two new functions in KafkaStreams, List > > allLagInfo() and List lagInfoForStore(String storeName) to > > return the lag information for an instance and a store respectively > >- Addition of new class KeyQueryMetadata. We need topicPartition for > > each key to be matched with the lag API for the topic partition. One way is > > to add new functions and fetch topicPartition from StreamsMetadataState but > > we thought having one call and fetching StreamsMetadata and topicPartition > > is more cleaner. > >- > > Renaming partitionsForHost to activePartitionsForHost in > > StreamsMetadataState > > and partitionsByHostState to activePartitionsByHostState > > in StreamsPartitionAssignor > >- We have also added the pseudo code of how all the changes will exist > > together and support the new querying APIs > > > > Please let me know if anything is pending now, before a vote can be > > started on this. On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder > > Brar wrote: > > > > >> Since there are two soft votes for separate active/standby API > > methods, I also change my position on that. Fine with 2 separate > > methods. Once we remove the lag information from these APIs, returning a > > List is less attractive, since the ordering has no special meaning now. > > Agreed, now that we are not returning lag, I am also sold on having two > > separate functions. We already have one which returns streamsMetadata for > > active tasks, and now we can add another one for standbys. > > > > > > > > On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar < > > vchan...@confluent.io> wrote: > > > > +1 to Sophie's suggestion. Having both lag in terms of time and offsets is > > good and makes for a more complete API. > > > > Since there are two soft votes for separate active/standby API methods, I > > also change my position on that. Fine with 2 separate methods. > > Once we remove the lag information from these APIs, returning a List is > > less attractive, since the ordering has no special meaning now. > > > > >> lag in offsets vs time: Having both, as suggested by Sophie would of > > course be best. What is a little unclear to me is, how in details are we > > going to compute both? > > @navinder may be next step is to flesh out these details and surface any > > larger changes we need to make if need be. > > > > Any other details we need to cover, before a VOTE can be called on this? > > > > > > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck wrote: > > > > > I am jumping in a little late here. > > > > > > Overall I agree with the proposal to push decision making on what/how to > > > query in the query layer. > > > > > > For point 5 from above, I'm slightly in favor of having a new method, > > > "standbyMetadataForKey()" or something similar. > > > Because even if we return all tasks in one list, the user will still have > > > to perform some filtering to separate the different tasks, so I don't > > feel > > > making two calls is a burden, and IMHO makes things more transparent for > > > the user. > > > If the final vote is for using an "isActive" field, I'm good with that as > > > well. > > > > > > Just my 2 cents. > > > > > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > > > wrote: > > > > > > > I think now we are aligned on almost all the design parts. Summarising > > > > below what has been discussed above and we have a general consensus on. > > > > > > > > > > > >- Rather than broadcasting lag across all nodes at rebalancing/with > > > the > > > > heartbeat, we will just return a list of all available standby’s in the > > > > system and the user can make IQ query any of those nodes which will > > > return > > > > the response, and the lag and offset time. Based on which user
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Wondering if anyone has thoughts on these changes? I liked that the new metadata fetch APIs provide all the information at once with consistent naming.. Any guidance on what you would like to be discussed or fleshed out more before we call a VOTE? On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar wrote: > Hi, > We have made some edits in the KIP( > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance) > after due deliberation on the agreed design to support the new query > design. This includes the new public API to query offset/time lag > information and other details related to querying standby tasks which have > come up after thinking of thorough details. > > > >- Addition of new config, “lag.fetch.interval.ms” to configure the > interval of time/offset lag >- Addition of new class StoreLagInfo to store the periodically obtained > time/offset lag >- Addition of two new functions in KafkaStreams, List > allLagInfo() and List lagInfoForStore(String storeName) to > return the lag information for an instance and a store respectively >- Addition of new class KeyQueryMetadata. We need topicPartition for > each key to be matched with the lag API for the topic partition. One way is > to add new functions and fetch topicPartition from StreamsMetadataState but > we thought having one call and fetching StreamsMetadata and topicPartition > is more cleaner. >- > Renaming partitionsForHost to activePartitionsForHost in StreamsMetadataState > and partitionsByHostState to activePartitionsByHostState > in StreamsPartitionAssignor >- We have also added the pseudo code of how all the changes will exist > together and support the new querying APIs > > Please let me know if anything is pending now, before a vote can be > started on this. On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder > Brar wrote: > > >> Since there are two soft votes for separate active/standby API > methods, I also change my position on that. Fine with 2 separate > methods. Once we remove the lag information from these APIs, returning a > List is less attractive, since the ordering has no special meaning now. > Agreed, now that we are not returning lag, I am also sold on having two > separate functions. We already have one which returns streamsMetadata for > active tasks, and now we can add another one for standbys. > > > > On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar < > vchan...@confluent.io> wrote: > > +1 to Sophie's suggestion. Having both lag in terms of time and offsets is > good and makes for a more complete API. > > Since there are two soft votes for separate active/standby API methods, I > also change my position on that. Fine with 2 separate methods. > Once we remove the lag information from these APIs, returning a List is > less attractive, since the ordering has no special meaning now. > > >> lag in offsets vs time: Having both, as suggested by Sophie would of > course be best. What is a little unclear to me is, how in details are we > going to compute both? > @navinder may be next step is to flesh out these details and surface any > larger changes we need to make if need be. > > Any other details we need to cover, before a VOTE can be called on this? > > > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck wrote: > > > I am jumping in a little late here. > > > > Overall I agree with the proposal to push decision making on what/how to > > query in the query layer. > > > > For point 5 from above, I'm slightly in favor of having a new method, > > "standbyMetadataForKey()" or something similar. > > Because even if we return all tasks in one list, the user will still have > > to perform some filtering to separate the different tasks, so I don't > feel > > making two calls is a burden, and IMHO makes things more transparent for > > the user. > > If the final vote is for using an "isActive" field, I'm good with that as > > well. > > > > Just my 2 cents. > > > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > > wrote: > > > > > I think now we are aligned on almost all the design parts. Summarising > > > below what has been discussed above and we have a general consensus on. > > > > > > > > >- Rather than broadcasting lag across all nodes at rebalancing/with > > the > > > heartbeat, we will just return a list of all available standby’s in the > > > system and the user can make IQ query any of those nodes which will > > return > > > the response, and the lag and offset time. Based on which user can > decide > > > if he wants to return the response back or call another standby. > > >- The current metadata query frequency will not change. It will be > > the > > > same as it does now, i.e. before each query. > > > > > >- For fetching list in StreamsMetadataState.java > and > > > List in StreamThreadStateStoreProvider.java > > (which > > > will return all active stores which are running/restoring and replica > > > stores
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hi, We have made some edits in the KIP(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance) after due deliberation on the agreed design to support the new query design. This includes the new public API to query offset/time lag information and other details related to querying standby tasks which have come up after thinking of thorough details. - Addition of new config, “lag.fetch.interval.ms” to configure the interval of time/offset lag - Addition of new class StoreLagInfo to store the periodically obtained time/offset lag - Addition of two new functions in KafkaStreams, List allLagInfo() and List lagInfoForStore(String storeName) to return the lag information for an instance and a store respectively - Addition of new class KeyQueryMetadata. We need topicPartition for each key to be matched with the lag API for the topic partition. One way is to add new functions and fetch topicPartition from StreamsMetadataState but we thought having one call and fetching StreamsMetadata and topicPartition is more cleaner. - Renaming partitionsForHost to activePartitionsForHost in StreamsMetadataState and partitionsByHostState to activePartitionsByHostState in StreamsPartitionAssignor - We have also added the pseudo code of how all the changes will exist together and support the new querying APIs Please let me know if anything is pending now, before a vote can be started on this. On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder Brar wrote: >> Since there are two soft votes for separate active/standby API methods, I >>also change my position on that. Fine with 2 separate methods. Once we >>remove the lag information from these APIs, returning a List is less >>attractive, since the ordering has no special meaning now. Agreed, now that we are not returning lag, I am also sold on having two separate functions. We already have one which returns streamsMetadata for active tasks, and now we can add another one for standbys. On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar wrote: +1 to Sophie's suggestion. Having both lag in terms of time and offsets is good and makes for a more complete API. Since there are two soft votes for separate active/standby API methods, I also change my position on that. Fine with 2 separate methods. Once we remove the lag information from these APIs, returning a List is less attractive, since the ordering has no special meaning now. >> lag in offsets vs time: Having both, as suggested by Sophie would of course be best. What is a little unclear to me is, how in details are we going to compute both? @navinder may be next step is to flesh out these details and surface any larger changes we need to make if need be. Any other details we need to cover, before a VOTE can be called on this? On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck wrote: > I am jumping in a little late here. > > Overall I agree with the proposal to push decision making on what/how to > query in the query layer. > > For point 5 from above, I'm slightly in favor of having a new method, > "standbyMetadataForKey()" or something similar. > Because even if we return all tasks in one list, the user will still have > to perform some filtering to separate the different tasks, so I don't feel > making two calls is a burden, and IMHO makes things more transparent for > the user. > If the final vote is for using an "isActive" field, I'm good with that as > well. > > Just my 2 cents. > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > wrote: > > > I think now we are aligned on almost all the design parts. Summarising > > below what has been discussed above and we have a general consensus on. > > > > > > - Rather than broadcasting lag across all nodes at rebalancing/with > the > > heartbeat, we will just return a list of all available standby’s in the > > system and the user can make IQ query any of those nodes which will > return > > the response, and the lag and offset time. Based on which user can decide > > if he wants to return the response back or call another standby. > > - The current metadata query frequency will not change. It will be > the > > same as it does now, i.e. before each query. > > > > - For fetching list in StreamsMetadataState.java and > > List in StreamThreadStateStoreProvider.java > (which > > will return all active stores which are running/restoring and replica > > stores which are running), we will add new functions and not disturb the > > existing functions > > > > - There is no need to add new StreamsConfig for implementing this KIP > > > > - We will add standbyPartitionsByHost in AssignmentInfo and > > StreamsMetadataState which would change the existing rebuildMetadata() > and > > setPartitionsByHostState() > > > > > > > > If anyone has any more concerns please feel free to add. Post this I will > > be initiating a vote. > > ~Navinder > >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
>> Since there are two soft votes for separate active/standby API methods, I >>also change my position on that. Fine with 2 separate methods. Once we remove >>the lag information from these APIs, returning a List is less attractive, >>since the ordering has no special meaning now. Agreed, now that we are not returning lag, I am also sold on having two separate functions. We already have one which returns streamsMetadata for active tasks, and now we can add another one for standbys. On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar wrote: +1 to Sophie's suggestion. Having both lag in terms of time and offsets is good and makes for a more complete API. Since there are two soft votes for separate active/standby API methods, I also change my position on that. Fine with 2 separate methods. Once we remove the lag information from these APIs, returning a List is less attractive, since the ordering has no special meaning now. >> lag in offsets vs time: Having both, as suggested by Sophie would of course be best. What is a little unclear to me is, how in details are we going to compute both? @navinder may be next step is to flesh out these details and surface any larger changes we need to make if need be. Any other details we need to cover, before a VOTE can be called on this? On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck wrote: > I am jumping in a little late here. > > Overall I agree with the proposal to push decision making on what/how to > query in the query layer. > > For point 5 from above, I'm slightly in favor of having a new method, > "standbyMetadataForKey()" or something similar. > Because even if we return all tasks in one list, the user will still have > to perform some filtering to separate the different tasks, so I don't feel > making two calls is a burden, and IMHO makes things more transparent for > the user. > If the final vote is for using an "isActive" field, I'm good with that as > well. > > Just my 2 cents. > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > wrote: > > > I think now we are aligned on almost all the design parts. Summarising > > below what has been discussed above and we have a general consensus on. > > > > > > - Rather than broadcasting lag across all nodes at rebalancing/with > the > > heartbeat, we will just return a list of all available standby’s in the > > system and the user can make IQ query any of those nodes which will > return > > the response, and the lag and offset time. Based on which user can decide > > if he wants to return the response back or call another standby. > > - The current metadata query frequency will not change. It will be > the > > same as it does now, i.e. before each query. > > > > - For fetching list in StreamsMetadataState.java and > > List in StreamThreadStateStoreProvider.java > (which > > will return all active stores which are running/restoring and replica > > stores which are running), we will add new functions and not disturb the > > existing functions > > > > - There is no need to add new StreamsConfig for implementing this KIP > > > > - We will add standbyPartitionsByHost in AssignmentInfo and > > StreamsMetadataState which would change the existing rebuildMetadata() > and > > setPartitionsByHostState() > > > > > > > > If anyone has any more concerns please feel free to add. Post this I will > > be initiating a vote. > > ~Navinder > > > > On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax < > > matth...@confluent.io> wrote: > > > > Just to close the loop @Vinoth: > > > > > 1. IIUC John intends to add (or we can do this in this KIP) lag > > information > > > to AssignmentInfo, which gets sent to every participant. > > > > As explained by John, currently KIP-441 plans to only report the > > information to the leader. But I guess, with the new proposal to not > > broadcast this information anyway, this concern is invalidated anyway > > > > > 2. At-least I was under the assumption that it can be called per query, > > > since the API docs don't seem to suggest otherwise. Do you see any > > > potential issues if we call this every query? (we should benchmark this > > > nonetheless) > > > > I did not see a real issue if people refresh the metadata frequently, > > because it would be a local call. My main point was, that this would > > change the current usage pattern of the API, and we would clearly need > > to communicate this change. Similar to (1), this concern in invalidated > > anyway. > > > > > > @John: I think it's a great idea to get rid of reporting lag, and > > pushing the decision making process about "what to query" into the query > > serving layer itself. This simplifies the overall design of this KIP > > significantly, and actually aligns very well with the idea that Kafka > > Streams (as it is a library) should only provide the basic building > > block. Many of my raised questions are invalided by this. > > > > > > > > Some questions are still open though: >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
+1 to Sophie's suggestion. Having both lag in terms of time and offsets is good and makes for a more complete API. Since there are two soft votes for separate active/standby API methods, I also change my position on that. Fine with 2 separate methods. Once we remove the lag information from these APIs, returning a List is less attractive, since the ordering has no special meaning now. >> lag in offsets vs time: Having both, as suggested by Sophie would of course be best. What is a little unclear to me is, how in details are we going to compute both? @navinder may be next step is to flesh out these details and surface any larger changes we need to make if need be. Any other details we need to cover, before a VOTE can be called on this? On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck wrote: > I am jumping in a little late here. > > Overall I agree with the proposal to push decision making on what/how to > query in the query layer. > > For point 5 from above, I'm slightly in favor of having a new method, > "standbyMetadataForKey()" or something similar. > Because even if we return all tasks in one list, the user will still have > to perform some filtering to separate the different tasks, so I don't feel > making two calls is a burden, and IMHO makes things more transparent for > the user. > If the final vote is for using an "isActive" field, I'm good with that as > well. > > Just my 2 cents. > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > wrote: > > > I think now we are aligned on almost all the design parts. Summarising > > below what has been discussed above and we have a general consensus on. > > > > > >- Rather than broadcasting lag across all nodes at rebalancing/with > the > > heartbeat, we will just return a list of all available standby’s in the > > system and the user can make IQ query any of those nodes which will > return > > the response, and the lag and offset time. Based on which user can decide > > if he wants to return the response back or call another standby. > >- The current metadata query frequency will not change. It will be > the > > same as it does now, i.e. before each query. > > > >- For fetching list in StreamsMetadataState.java and > > List in StreamThreadStateStoreProvider.java > (which > > will return all active stores which are running/restoring and replica > > stores which are running), we will add new functions and not disturb the > > existing functions > > > >- There is no need to add new StreamsConfig for implementing this KIP > > > >- We will add standbyPartitionsByHost in AssignmentInfo and > > StreamsMetadataState which would change the existing rebuildMetadata() > and > > setPartitionsByHostState() > > > > > > > > If anyone has any more concerns please feel free to add. Post this I will > > be initiating a vote. > > ~Navinder > > > > On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax < > > matth...@confluent.io> wrote: > > > > Just to close the loop @Vinoth: > > > > > 1. IIUC John intends to add (or we can do this in this KIP) lag > > information > > > to AssignmentInfo, which gets sent to every participant. > > > > As explained by John, currently KIP-441 plans to only report the > > information to the leader. But I guess, with the new proposal to not > > broadcast this information anyway, this concern is invalidated anyway > > > > > 2. At-least I was under the assumption that it can be called per query, > > > since the API docs don't seem to suggest otherwise. Do you see any > > > potential issues if we call this every query? (we should benchmark this > > > nonetheless) > > > > I did not see a real issue if people refresh the metadata frequently, > > because it would be a local call. My main point was, that this would > > change the current usage pattern of the API, and we would clearly need > > to communicate this change. Similar to (1), this concern in invalidated > > anyway. > > > > > > @John: I think it's a great idea to get rid of reporting lag, and > > pushing the decision making process about "what to query" into the query > > serving layer itself. This simplifies the overall design of this KIP > > significantly, and actually aligns very well with the idea that Kafka > > Streams (as it is a library) should only provide the basic building > > block. Many of my raised questions are invalided by this. > > > > > > > > Some questions are still open though: > > > > > 10) Do we need to distinguish between active(restoring) and standby > > > tasks? Or could be treat both as the same? > > > > > > @Vinoth: about (5). I see your point about multiple calls vs a single > > call. I still slightly prefer multiple calls, but it's highly subjective > > and I would also be fine to add an #isActive() method. Would be good the > > get feedback from others. > > > > > > For (14), ie, lag in offsets vs time: Having both, as suggested by > > Sophie would of course be best. What is a little unclear to me is, how > > in details are we going to
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
I am jumping in a little late here. Overall I agree with the proposal to push decision making on what/how to query in the query layer. For point 5 from above, I'm slightly in favor of having a new method, "standbyMetadataForKey()" or something similar. Because even if we return all tasks in one list, the user will still have to perform some filtering to separate the different tasks, so I don't feel making two calls is a burden, and IMHO makes things more transparent for the user. If the final vote is for using an "isActive" field, I'm good with that as well. Just my 2 cents. On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar wrote: > I think now we are aligned on almost all the design parts. Summarising > below what has been discussed above and we have a general consensus on. > > >- Rather than broadcasting lag across all nodes at rebalancing/with the > heartbeat, we will just return a list of all available standby’s in the > system and the user can make IQ query any of those nodes which will return > the response, and the lag and offset time. Based on which user can decide > if he wants to return the response back or call another standby. >- The current metadata query frequency will not change. It will be the > same as it does now, i.e. before each query. > >- For fetching list in StreamsMetadataState.java and > List in StreamThreadStateStoreProvider.java (which > will return all active stores which are running/restoring and replica > stores which are running), we will add new functions and not disturb the > existing functions > >- There is no need to add new StreamsConfig for implementing this KIP > >- We will add standbyPartitionsByHost in AssignmentInfo and > StreamsMetadataState which would change the existing rebuildMetadata() and > setPartitionsByHostState() > > > > If anyone has any more concerns please feel free to add. Post this I will > be initiating a vote. > ~Navinder > > On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax < > matth...@confluent.io> wrote: > > Just to close the loop @Vinoth: > > > 1. IIUC John intends to add (or we can do this in this KIP) lag > information > > to AssignmentInfo, which gets sent to every participant. > > As explained by John, currently KIP-441 plans to only report the > information to the leader. But I guess, with the new proposal to not > broadcast this information anyway, this concern is invalidated anyway > > > 2. At-least I was under the assumption that it can be called per query, > > since the API docs don't seem to suggest otherwise. Do you see any > > potential issues if we call this every query? (we should benchmark this > > nonetheless) > > I did not see a real issue if people refresh the metadata frequently, > because it would be a local call. My main point was, that this would > change the current usage pattern of the API, and we would clearly need > to communicate this change. Similar to (1), this concern in invalidated > anyway. > > > @John: I think it's a great idea to get rid of reporting lag, and > pushing the decision making process about "what to query" into the query > serving layer itself. This simplifies the overall design of this KIP > significantly, and actually aligns very well with the idea that Kafka > Streams (as it is a library) should only provide the basic building > block. Many of my raised questions are invalided by this. > > > > Some questions are still open though: > > > 10) Do we need to distinguish between active(restoring) and standby > > tasks? Or could be treat both as the same? > > > @Vinoth: about (5). I see your point about multiple calls vs a single > call. I still slightly prefer multiple calls, but it's highly subjective > and I would also be fine to add an #isActive() method. Would be good the > get feedback from others. > > > For (14), ie, lag in offsets vs time: Having both, as suggested by > Sophie would of course be best. What is a little unclear to me is, how > in details are we going to compute both? > > > > -Matthias > > > > On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote: > > Just to chime in on the "report lag vs timestamp difference" issue, I > would > > actually advocate for both. As mentioned already, time difference is > > probably a lot easier and/or more useful to reason about in terms of > > "freshness" > > of the state. But in the case when all queried stores are far behind, lag > > could > > be used to estimate the recovery velocity. You can then get a (pretty > rough) > > idea of when a store might be ready, and wait until around then to query > > again. > > > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang > wrote: > > > >> I think I agree with John's recent reasoning as well: instead of letting > >> the storeMetadataAPI to return the staleness information, letting the > >> client to query either active or standby and letting standby query > response > >> to include both the values + timestamp (or lag, as in diffs of > timestamps) > >> would actually be
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
I think now we are aligned on almost all the design parts. Summarising below what has been discussed above and we have a general consensus on. - Rather than broadcasting lag across all nodes at rebalancing/with the heartbeat, we will just return a list of all available standby’s in the system and the user can make IQ query any of those nodes which will return the response, and the lag and offset time. Based on which user can decide if he wants to return the response back or call another standby. - The current metadata query frequency will not change. It will be the same as it does now, i.e. before each query. - For fetching list in StreamsMetadataState.java and List in StreamThreadStateStoreProvider.java (which will return all active stores which are running/restoring and replica stores which are running), we will add new functions and not disturb the existing functions - There is no need to add new StreamsConfig for implementing this KIP - We will add standbyPartitionsByHost in AssignmentInfo and StreamsMetadataState which would change the existing rebuildMetadata() and setPartitionsByHostState() If anyone has any more concerns please feel free to add. Post this I will be initiating a vote. ~Navinder On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax wrote: Just to close the loop @Vinoth: > 1. IIUC John intends to add (or we can do this in this KIP) lag information > to AssignmentInfo, which gets sent to every participant. As explained by John, currently KIP-441 plans to only report the information to the leader. But I guess, with the new proposal to not broadcast this information anyway, this concern is invalidated anyway > 2. At-least I was under the assumption that it can be called per query, > since the API docs don't seem to suggest otherwise. Do you see any > potential issues if we call this every query? (we should benchmark this > nonetheless) I did not see a real issue if people refresh the metadata frequently, because it would be a local call. My main point was, that this would change the current usage pattern of the API, and we would clearly need to communicate this change. Similar to (1), this concern in invalidated anyway. @John: I think it's a great idea to get rid of reporting lag, and pushing the decision making process about "what to query" into the query serving layer itself. This simplifies the overall design of this KIP significantly, and actually aligns very well with the idea that Kafka Streams (as it is a library) should only provide the basic building block. Many of my raised questions are invalided by this. Some questions are still open though: > 10) Do we need to distinguish between active(restoring) and standby > tasks? Or could be treat both as the same? @Vinoth: about (5). I see your point about multiple calls vs a single call. I still slightly prefer multiple calls, but it's highly subjective and I would also be fine to add an #isActive() method. Would be good the get feedback from others. For (14), ie, lag in offsets vs time: Having both, as suggested by Sophie would of course be best. What is a little unclear to me is, how in details are we going to compute both? -Matthias On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote: > Just to chime in on the "report lag vs timestamp difference" issue, I would > actually advocate for both. As mentioned already, time difference is > probably a lot easier and/or more useful to reason about in terms of > "freshness" > of the state. But in the case when all queried stores are far behind, lag > could > be used to estimate the recovery velocity. You can then get a (pretty rough) > idea of when a store might be ready, and wait until around then to query > again. > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang wrote: > >> I think I agree with John's recent reasoning as well: instead of letting >> the storeMetadataAPI to return the staleness information, letting the >> client to query either active or standby and letting standby query response >> to include both the values + timestamp (or lag, as in diffs of timestamps) >> would actually be more intuitive -- not only the streams client is simpler, >> from user's perspective they also do not need to periodically refresh their >> staleness information from the client, but only need to make decisions on >> the fly whenever they need to query. >> >> Again the standby replica then need to know the current active task's >> timestamp, which can be found from the log end record's encoded timestamp; >> today we standby tasks do not read that specific record, but only refresh >> its knowledge on the log end offset, but I think refreshing the latest >> record timestamp is not a very bad request to add on the standby replicas. >> >> >> Guozhang >> >> >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar >> wrote: >> >>> +1 As someone implementing a query routing layer, there is already a need >>> to have
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
I think now we are aligned on almost all the design parts. Summarising below what has been discussed above and we have a general consensus on. 1. Rather than broadcasting lag across all nodes at rebalancing/with the heartbeat, we will just return a list of all available standby’s in the system and the user can make IQ query any of those nodes which will return the response, and the lag and offset time. Based on which user can decide if he wants to return the response back or call another standby.2. The current metadata query frequency will not change. It will be the same as it does now, i.e. before each query.3. For fetching list in StreamsMetadataState.java and List in StreamThreadStateStoreProvider.java (which will return all active stores which are running/restoring and replica stores which are running), we will add new functions and not disturb the existing functions4. There is no need to add new StreamsConfig for implementing this KIP5. We will add standbyPartitionsByHost in AssignmentInfo and StreamsMetadataState which would change the existing rebuildMetadata() and setPartitionsByHostState() If anyone has any more concerns please feel free to add. Post this I will be initiating a vote. ~Navinder On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax wrote: Just to close the loop @Vinoth: > 1. IIUC John intends to add (or we can do this in this KIP) lag information > to AssignmentInfo, which gets sent to every participant. As explained by John, currently KIP-441 plans to only report the information to the leader. But I guess, with the new proposal to not broadcast this information anyway, this concern is invalidated anyway > 2. At-least I was under the assumption that it can be called per query, > since the API docs don't seem to suggest otherwise. Do you see any > potential issues if we call this every query? (we should benchmark this > nonetheless) I did not see a real issue if people refresh the metadata frequently, because it would be a local call. My main point was, that this would change the current usage pattern of the API, and we would clearly need to communicate this change. Similar to (1), this concern in invalidated anyway. @John: I think it's a great idea to get rid of reporting lag, and pushing the decision making process about "what to query" into the query serving layer itself. This simplifies the overall design of this KIP significantly, and actually aligns very well with the idea that Kafka Streams (as it is a library) should only provide the basic building block. Many of my raised questions are invalided by this. Some questions are still open though: > 10) Do we need to distinguish between active(restoring) and standby > tasks? Or could be treat both as the same? @Vinoth: about (5). I see your point about multiple calls vs a single call. I still slightly prefer multiple calls, but it's highly subjective and I would also be fine to add an #isActive() method. Would be good the get feedback from others. For (14), ie, lag in offsets vs time: Having both, as suggested by Sophie would of course be best. What is a little unclear to me is, how in details are we going to compute both? -Matthias On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote: > Just to chime in on the "report lag vs timestamp difference" issue, I would > actually advocate for both. As mentioned already, time difference is > probably a lot easier and/or more useful to reason about in terms of > "freshness" > of the state. But in the case when all queried stores are far behind, lag > could > be used to estimate the recovery velocity. You can then get a (pretty rough) > idea of when a store might be ready, and wait until around then to query > again. > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang wrote: > >> I think I agree with John's recent reasoning as well: instead of letting >> the storeMetadataAPI to return the staleness information, letting the >> client to query either active or standby and letting standby query response >> to include both the values + timestamp (or lag, as in diffs of timestamps) >> would actually be more intuitive -- not only the streams client is simpler, >> from user's perspective they also do not need to periodically refresh their >> staleness information from the client, but only need to make decisions on >> the fly whenever they need to query. >> >> Again the standby replica then need to know the current active task's >> timestamp, which can be found from the log end record's encoded timestamp; >> today we standby tasks do not read that specific record, but only refresh >> its knowledge on the log end offset, but I think refreshing the latest >> record timestamp is not a very bad request to add on the standby replicas. >> >> >> Guozhang >> >> >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar >> wrote: >> >>> +1 As someone implementing a query routing layer, there is already a need >>> to have mechanisms in place to do
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Just to close the loop @Vinoth: > 1. IIUC John intends to add (or we can do this in this KIP) lag information > to AssignmentInfo, which gets sent to every participant. As explained by John, currently KIP-441 plans to only report the information to the leader. But I guess, with the new proposal to not broadcast this information anyway, this concern is invalidated anyway > 2. At-least I was under the assumption that it can be called per query, > since the API docs don't seem to suggest otherwise. Do you see any > potential issues if we call this every query? (we should benchmark this > nonetheless) I did not see a real issue if people refresh the metadata frequently, because it would be a local call. My main point was, that this would change the current usage pattern of the API, and we would clearly need to communicate this change. Similar to (1), this concern in invalidated anyway. @John: I think it's a great idea to get rid of reporting lag, and pushing the decision making process about "what to query" into the query serving layer itself. This simplifies the overall design of this KIP significantly, and actually aligns very well with the idea that Kafka Streams (as it is a library) should only provide the basic building block. Many of my raised questions are invalided by this. Some questions are still open though: > 10) Do we need to distinguish between active(restoring) and standby > tasks? Or could be treat both as the same? @Vinoth: about (5). I see your point about multiple calls vs a single call. I still slightly prefer multiple calls, but it's highly subjective and I would also be fine to add an #isActive() method. Would be good the get feedback from others. For (14), ie, lag in offsets vs time: Having both, as suggested by Sophie would of course be best. What is a little unclear to me is, how in details are we going to compute both? -Matthias On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote: > Just to chime in on the "report lag vs timestamp difference" issue, I would > actually advocate for both. As mentioned already, time difference is > probably a lot easier and/or more useful to reason about in terms of > "freshness" > of the state. But in the case when all queried stores are far behind, lag > could > be used to estimate the recovery velocity. You can then get a (pretty rough) > idea of when a store might be ready, and wait until around then to query > again. > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang wrote: > >> I think I agree with John's recent reasoning as well: instead of letting >> the storeMetadataAPI to return the staleness information, letting the >> client to query either active or standby and letting standby query response >> to include both the values + timestamp (or lag, as in diffs of timestamps) >> would actually be more intuitive -- not only the streams client is simpler, >> from user's perspective they also do not need to periodically refresh their >> staleness information from the client, but only need to make decisions on >> the fly whenever they need to query. >> >> Again the standby replica then need to know the current active task's >> timestamp, which can be found from the log end record's encoded timestamp; >> today we standby tasks do not read that specific record, but only refresh >> its knowledge on the log end offset, but I think refreshing the latest >> record timestamp is not a very bad request to add on the standby replicas. >> >> >> Guozhang >> >> >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar >> wrote: >> >>> +1 As someone implementing a query routing layer, there is already a need >>> to have mechanisms in place to do healthchecks/failure detection to >> detect >>> failures for queries, while Streams rebalancing eventually kicks in the >>> background. >>> So, pushing this complexity to the IQ client app keeps Streams simpler as >>> well. IQs will be potentially issues at an order of magnitude more >>> frequently and it can achieve good freshness for the lag information. >>> >>> I would like to add however, that we would also need to introduce apis in >>> KafkaStreams class, for obtaining lag information for all stores local to >>> that host. This is for the IQs to relay back with the response/its own >>> heartbeat mechanism. >>> >>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler wrote: >>> Hi all, I've been mulling about this KIP, and I think I was on the wrong track earlier with regard to task lags. Tl;dr: I don't think we should add lags at all to the metadata API (and also not to the AssignmentInfo protocol message). Like I mentioned early on, reporting lag via SubscriptionInfo/AssignmentInfo would only work while rebalances are happening. Once the group stabilizes, no members would be notified of each others' lags anymore. I had been thinking that the solution would be the heartbeat proposal I mentioned earlier, but that proposal would have reported the
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Just to chime in on the "report lag vs timestamp difference" issue, I would actually advocate for both. As mentioned already, time difference is probably a lot easier and/or more useful to reason about in terms of "freshness" of the state. But in the case when all queried stores are far behind, lag could be used to estimate the recovery velocity. You can then get a (pretty rough) idea of when a store might be ready, and wait until around then to query again. On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang wrote: > I think I agree with John's recent reasoning as well: instead of letting > the storeMetadataAPI to return the staleness information, letting the > client to query either active or standby and letting standby query response > to include both the values + timestamp (or lag, as in diffs of timestamps) > would actually be more intuitive -- not only the streams client is simpler, > from user's perspective they also do not need to periodically refresh their > staleness information from the client, but only need to make decisions on > the fly whenever they need to query. > > Again the standby replica then need to know the current active task's > timestamp, which can be found from the log end record's encoded timestamp; > today we standby tasks do not read that specific record, but only refresh > its knowledge on the log end offset, but I think refreshing the latest > record timestamp is not a very bad request to add on the standby replicas. > > > Guozhang > > > On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar > wrote: > > > +1 As someone implementing a query routing layer, there is already a need > > to have mechanisms in place to do healthchecks/failure detection to > detect > > failures for queries, while Streams rebalancing eventually kicks in the > > background. > > So, pushing this complexity to the IQ client app keeps Streams simpler as > > well. IQs will be potentially issues at an order of magnitude more > > frequently and it can achieve good freshness for the lag information. > > > > I would like to add however, that we would also need to introduce apis in > > KafkaStreams class, for obtaining lag information for all stores local to > > that host. This is for the IQs to relay back with the response/its own > > heartbeat mechanism. > > > > On Thu, Oct 24, 2019 at 3:12 PM John Roesler wrote: > > > > > Hi all, > > > > > > I've been mulling about this KIP, and I think I was on the wrong track > > > earlier with regard to task lags. Tl;dr: I don't think we should add > > > lags at all to the metadata API (and also not to the AssignmentInfo > > > protocol message). > > > > > > Like I mentioned early on, reporting lag via > > > SubscriptionInfo/AssignmentInfo would only work while rebalances are > > > happening. Once the group stabilizes, no members would be notified of > > > each others' lags anymore. I had been thinking that the solution would > > > be the heartbeat proposal I mentioned earlier, but that proposal would > > > have reported the heartbeats of the members only to the leader member > > > (the one who makes assignments). To be useful in the context of _this_ > > > KIP, we would also have to report the lags in the heartbeat responses > > > to of _all_ members. This is a concern to be because now _all_ the > > > lags get reported to _all_ the members on _every_ heartbeat... a lot > > > of chatter. > > > > > > Plus, the proposal for KIP-441 is only to report the lags of each > > > _task_. This is the sum of the lags of all the stores in the tasks. > > > But this would be insufficient for KIP-535. For this kip, we would > > > want the lag specifically of the store we want to query. So this > > > means, we have to report the lags of all the stores of all the members > > > to every member... even more chatter! > > > > > > The final nail in the coffin to me is that IQ clients would have to > > > start refreshing their metadata quite frequently to stay up to date on > > > the lags, which adds even more overhead to the system. > > > > > > Consider a strawman alternative: we bring KIP-535 back to extending > > > the metadata API to tell the client the active and standby replicas > > > for the key in question (not including and "staleness/lag" > > > restriction, just returning all the replicas). Then, the client picks > > > a replica and sends the query. The server returns the current lag > > > along with the response (maybe in an HTML header or something). Then, > > > the client keeps a map of its last observed lags for each replica, and > > > uses this information to prefer fresher replicas. > > > > > > OR, if it wants only to query the active replica, it would throw an > > > error on any lag response greater than zero, refreshes its metadata by > > > re-querying the metadata API, and tries again with the current active > > > replica. > > > > > > This way, the lag information will be super fresh for the client, and > > > we keep the Metadata API / Assignment,Subscription / and Heartbeat as > > >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Sending the older replies again as they were unreadable due to bad formatting. >> There was a follow-on idea I POCed to continuously share lag information in >> the heartbeat protocol +1 that would be great, I will update the KIP assuming this work will finish soon >> I think that adding a new method to StreamsMetadataState and deprecating the >> existing method is the best way to go; we just can't change the return types of any existing methods. +1 on this, we will add new methods for users who would be interested in querying back a list of possible options to query from and leave the current function getStreamsMetadataForKey() untouched for users who want absolute consistency. >> why not just _always_ return all available metadata (including >> active/standby or lag) and let the caller decide to which node they want to >> route the query +1. I think this makes sense as from a user standpoint there is no difference b/w an active and a standby if both have same lag, Infact users would be able to use this api to reduce query load on actives, so returning all available options along with the current lag in each would make sense and leave it to user how they want to use this data. This has another added advantage. If a user queries any random machine for a key and that machine has a replica for the partition(where key belongs) user might choose to serve the data from there itself(if it doesn’t lag much) rather than finding the active and making an IQ to that. This would save some critical time in serving for some applications. >> Adding the lag in terms of timestamp diff comparing the committed offset. +1 on this, I think it’s more readable. But as John said the function allMetadataForKey() is just returning the possible options from where users can query a key, so we can even drop the parameter enableReplicaServing/tolerableDataStaleness and just return all the streamsMetadata containing that key along with the offset limit. - @John has already commented on this one. - Yeah the usage pattern would include querying this prior to every request - Will add the changes to StreamsMetadata in the KIP, would include changes in rebuildMetadata() etc. - Makes sense, already addressed above - Is it important from a user perspective if they are querying an active(processing), active(restoring), a standby task if we have away of denoting lag in a readable manner which kind of signifies the user that this is the best node to query the fresh data. - Yes, I intend to return the actives and replicas in the same return list in allMetadataForKey() - I think we can’t be dependent on propagating lag just during rebalances, sending it with each hearbeat every 3 secs seems like a better idea and having this less delay also saves us from this race condition. - yes, we need new functions to return activeRestoring and standbyRunning tasks. - StreamsConfig doesn’t look like of much use to me since we are giving all possible options via this function, or they can use existing function getStreamsMetadataForKey() and get just the active - I think treat them both the same and let the lag do the talking - We are just sending them the option to query from in allMetadataForKey(), which doesn’t include any handle. We then query that machine for the key where it calls allStores() and tries to find the task in activeRunning/activeRestoring/standbyRunning and adds the store handle here. - Need to verify, but during the exact point when store is closed to transition it from restoring to running the queries will fail. The caller in such case can have their own configurable retries to check again or try the replica if a call fails to active - I think KIP-216 is working on those lines, we might not need few of those exceptions since now the basic idea of this KIP is to support IQ during rebalancing. - Addressed above, agreed it looks more readable. On Friday, 25 October, 2019, 10:23:57 am IST, Guozhang Wang wrote: I think I agree with John's recent reasoning as well: instead of letting the storeMetadataAPI to return the staleness information, letting the client to query either active or standby and letting standby query response to include both the values + timestamp (or lag, as in diffs of timestamps) would actually be more intuitive -- not only the streams client is simpler, from user's perspective they also do not need to periodically refresh their staleness information from the client, but only need to make decisions on the fly whenever they need to query. Again the standby replica then need to know the current active task's timestamp, which can be found from the log end record's encoded timestamp; today we standby tasks do not read that specific record, but only refresh its knowledge on the log end offset, but I think refreshing the latest record timestamp is not a very bad request to add on the standby replicas.
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
I think I agree with John's recent reasoning as well: instead of letting the storeMetadataAPI to return the staleness information, letting the client to query either active or standby and letting standby query response to include both the values + timestamp (or lag, as in diffs of timestamps) would actually be more intuitive -- not only the streams client is simpler, from user's perspective they also do not need to periodically refresh their staleness information from the client, but only need to make decisions on the fly whenever they need to query. Again the standby replica then need to know the current active task's timestamp, which can be found from the log end record's encoded timestamp; today we standby tasks do not read that specific record, but only refresh its knowledge on the log end offset, but I think refreshing the latest record timestamp is not a very bad request to add on the standby replicas. Guozhang On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar wrote: > +1 As someone implementing a query routing layer, there is already a need > to have mechanisms in place to do healthchecks/failure detection to detect > failures for queries, while Streams rebalancing eventually kicks in the > background. > So, pushing this complexity to the IQ client app keeps Streams simpler as > well. IQs will be potentially issues at an order of magnitude more > frequently and it can achieve good freshness for the lag information. > > I would like to add however, that we would also need to introduce apis in > KafkaStreams class, for obtaining lag information for all stores local to > that host. This is for the IQs to relay back with the response/its own > heartbeat mechanism. > > On Thu, Oct 24, 2019 at 3:12 PM John Roesler wrote: > > > Hi all, > > > > I've been mulling about this KIP, and I think I was on the wrong track > > earlier with regard to task lags. Tl;dr: I don't think we should add > > lags at all to the metadata API (and also not to the AssignmentInfo > > protocol message). > > > > Like I mentioned early on, reporting lag via > > SubscriptionInfo/AssignmentInfo would only work while rebalances are > > happening. Once the group stabilizes, no members would be notified of > > each others' lags anymore. I had been thinking that the solution would > > be the heartbeat proposal I mentioned earlier, but that proposal would > > have reported the heartbeats of the members only to the leader member > > (the one who makes assignments). To be useful in the context of _this_ > > KIP, we would also have to report the lags in the heartbeat responses > > to of _all_ members. This is a concern to be because now _all_ the > > lags get reported to _all_ the members on _every_ heartbeat... a lot > > of chatter. > > > > Plus, the proposal for KIP-441 is only to report the lags of each > > _task_. This is the sum of the lags of all the stores in the tasks. > > But this would be insufficient for KIP-535. For this kip, we would > > want the lag specifically of the store we want to query. So this > > means, we have to report the lags of all the stores of all the members > > to every member... even more chatter! > > > > The final nail in the coffin to me is that IQ clients would have to > > start refreshing their metadata quite frequently to stay up to date on > > the lags, which adds even more overhead to the system. > > > > Consider a strawman alternative: we bring KIP-535 back to extending > > the metadata API to tell the client the active and standby replicas > > for the key in question (not including and "staleness/lag" > > restriction, just returning all the replicas). Then, the client picks > > a replica and sends the query. The server returns the current lag > > along with the response (maybe in an HTML header or something). Then, > > the client keeps a map of its last observed lags for each replica, and > > uses this information to prefer fresher replicas. > > > > OR, if it wants only to query the active replica, it would throw an > > error on any lag response greater than zero, refreshes its metadata by > > re-querying the metadata API, and tries again with the current active > > replica. > > > > This way, the lag information will be super fresh for the client, and > > we keep the Metadata API / Assignment,Subscription / and Heartbeat as > > slim as possible. > > > > Side note: I do think that some time soon, we'll have to add a library > > for IQ server/clients. I think that this logic will start to get > > pretty complex. > > > > I hope this thinking is reasonably clear! > > Thanks again, > > -John > > > > Does that > > > > On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar > > wrote: > > > > > > Responding to the points raised by Matthias > > > > > > 1. IIUC John intends to add (or we can do this in this KIP) lag > > information > > > to AssignmentInfo, which gets sent to every participant. > > > > > > 2. At-least I was under the assumption that it can be called per query, > > > since the API docs don't seem
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
+1 As someone implementing a query routing layer, there is already a need to have mechanisms in place to do healthchecks/failure detection to detect failures for queries, while Streams rebalancing eventually kicks in the background. So, pushing this complexity to the IQ client app keeps Streams simpler as well. IQs will be potentially issues at an order of magnitude more frequently and it can achieve good freshness for the lag information. I would like to add however, that we would also need to introduce apis in KafkaStreams class, for obtaining lag information for all stores local to that host. This is for the IQs to relay back with the response/its own heartbeat mechanism. On Thu, Oct 24, 2019 at 3:12 PM John Roesler wrote: > Hi all, > > I've been mulling about this KIP, and I think I was on the wrong track > earlier with regard to task lags. Tl;dr: I don't think we should add > lags at all to the metadata API (and also not to the AssignmentInfo > protocol message). > > Like I mentioned early on, reporting lag via > SubscriptionInfo/AssignmentInfo would only work while rebalances are > happening. Once the group stabilizes, no members would be notified of > each others' lags anymore. I had been thinking that the solution would > be the heartbeat proposal I mentioned earlier, but that proposal would > have reported the heartbeats of the members only to the leader member > (the one who makes assignments). To be useful in the context of _this_ > KIP, we would also have to report the lags in the heartbeat responses > to of _all_ members. This is a concern to be because now _all_ the > lags get reported to _all_ the members on _every_ heartbeat... a lot > of chatter. > > Plus, the proposal for KIP-441 is only to report the lags of each > _task_. This is the sum of the lags of all the stores in the tasks. > But this would be insufficient for KIP-535. For this kip, we would > want the lag specifically of the store we want to query. So this > means, we have to report the lags of all the stores of all the members > to every member... even more chatter! > > The final nail in the coffin to me is that IQ clients would have to > start refreshing their metadata quite frequently to stay up to date on > the lags, which adds even more overhead to the system. > > Consider a strawman alternative: we bring KIP-535 back to extending > the metadata API to tell the client the active and standby replicas > for the key in question (not including and "staleness/lag" > restriction, just returning all the replicas). Then, the client picks > a replica and sends the query. The server returns the current lag > along with the response (maybe in an HTML header or something). Then, > the client keeps a map of its last observed lags for each replica, and > uses this information to prefer fresher replicas. > > OR, if it wants only to query the active replica, it would throw an > error on any lag response greater than zero, refreshes its metadata by > re-querying the metadata API, and tries again with the current active > replica. > > This way, the lag information will be super fresh for the client, and > we keep the Metadata API / Assignment,Subscription / and Heartbeat as > slim as possible. > > Side note: I do think that some time soon, we'll have to add a library > for IQ server/clients. I think that this logic will start to get > pretty complex. > > I hope this thinking is reasonably clear! > Thanks again, > -John > > Does that > > On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar > wrote: > > > > Responding to the points raised by Matthias > > > > 1. IIUC John intends to add (or we can do this in this KIP) lag > information > > to AssignmentInfo, which gets sent to every participant. > > > > 2. At-least I was under the assumption that it can be called per query, > > since the API docs don't seem to suggest otherwise. Do you see any > > potential issues if we call this every query? (we should benchmark this > > nonetheless) > > > > 4. Agree. metadataForKey() implicitly would return the active host > metadata > > (as it was before). We should also document this in that APIs javadoc, > > given we have another method(s) that returns more host metadata now. > > > > 5. While I see the point, the app/caller has to make two different APIs > > calls to obtain active/standby and potentially do the same set of > operation > > to query the state. I personally still like a method like isActive() > > better, but don't have strong opinions. > > > > 9. If we do expose the lag information, could we just leave it upto to > the > > caller to decide whether it errors out or not and not make the decision > > within Streams? i.e we don't need a new config > > > > 14. +1 . If it's easier to do right away. We started with number of > > records, following the lead from KIP-441 > > > > On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar > > wrote: > > > > > > > > Thanks, everyone for taking a look. Some very cool ideas have flown in. > > > > > > >> There was a
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hi all, I've been mulling about this KIP, and I think I was on the wrong track earlier with regard to task lags. Tl;dr: I don't think we should add lags at all to the metadata API (and also not to the AssignmentInfo protocol message). Like I mentioned early on, reporting lag via SubscriptionInfo/AssignmentInfo would only work while rebalances are happening. Once the group stabilizes, no members would be notified of each others' lags anymore. I had been thinking that the solution would be the heartbeat proposal I mentioned earlier, but that proposal would have reported the heartbeats of the members only to the leader member (the one who makes assignments). To be useful in the context of _this_ KIP, we would also have to report the lags in the heartbeat responses to of _all_ members. This is a concern to be because now _all_ the lags get reported to _all_ the members on _every_ heartbeat... a lot of chatter. Plus, the proposal for KIP-441 is only to report the lags of each _task_. This is the sum of the lags of all the stores in the tasks. But this would be insufficient for KIP-535. For this kip, we would want the lag specifically of the store we want to query. So this means, we have to report the lags of all the stores of all the members to every member... even more chatter! The final nail in the coffin to me is that IQ clients would have to start refreshing their metadata quite frequently to stay up to date on the lags, which adds even more overhead to the system. Consider a strawman alternative: we bring KIP-535 back to extending the metadata API to tell the client the active and standby replicas for the key in question (not including and "staleness/lag" restriction, just returning all the replicas). Then, the client picks a replica and sends the query. The server returns the current lag along with the response (maybe in an HTML header or something). Then, the client keeps a map of its last observed lags for each replica, and uses this information to prefer fresher replicas. OR, if it wants only to query the active replica, it would throw an error on any lag response greater than zero, refreshes its metadata by re-querying the metadata API, and tries again with the current active replica. This way, the lag information will be super fresh for the client, and we keep the Metadata API / Assignment,Subscription / and Heartbeat as slim as possible. Side note: I do think that some time soon, we'll have to add a library for IQ server/clients. I think that this logic will start to get pretty complex. I hope this thinking is reasonably clear! Thanks again, -John Does that On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar wrote: > > Responding to the points raised by Matthias > > 1. IIUC John intends to add (or we can do this in this KIP) lag information > to AssignmentInfo, which gets sent to every participant. > > 2. At-least I was under the assumption that it can be called per query, > since the API docs don't seem to suggest otherwise. Do you see any > potential issues if we call this every query? (we should benchmark this > nonetheless) > > 4. Agree. metadataForKey() implicitly would return the active host metadata > (as it was before). We should also document this in that APIs javadoc, > given we have another method(s) that returns more host metadata now. > > 5. While I see the point, the app/caller has to make two different APIs > calls to obtain active/standby and potentially do the same set of operation > to query the state. I personally still like a method like isActive() > better, but don't have strong opinions. > > 9. If we do expose the lag information, could we just leave it upto to the > caller to decide whether it errors out or not and not make the decision > within Streams? i.e we don't need a new config > > 14. +1 . If it's easier to do right away. We started with number of > records, following the lead from KIP-441 > > On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar > wrote: > > > > > Thanks, everyone for taking a look. Some very cool ideas have flown in. > > > > >> There was a follow-on idea I POCed to continuously share lag > > information in the heartbeat protocol+1 that would be great, I will update > > the KIP assuming this work will finish soon > > >> I think that adding a new method to StreamsMetadataState and > > deprecating the existing method isthe best way to go; we just can't change > > the return types of any existing methods.+1 on this, we will add new > > methods for users who would be interested in querying back a list of > > possible options to query from and leave the current function > > getStreamsMetadataForKey() untouched for users who want absolute > > consistency. > > >> why not just always return all available metadata (including > > active/standby or lag) and let the caller decide to which node they want to > > route the query+1. I think this makes sense as from a user standpoint there > > is no difference b/w an active and a standby if both have same lag,
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Responding to the points raised by Matthias 1. IIUC John intends to add (or we can do this in this KIP) lag information to AssignmentInfo, which gets sent to every participant. 2. At-least I was under the assumption that it can be called per query, since the API docs don't seem to suggest otherwise. Do you see any potential issues if we call this every query? (we should benchmark this nonetheless) 4. Agree. metadataForKey() implicitly would return the active host metadata (as it was before). We should also document this in that APIs javadoc, given we have another method(s) that returns more host metadata now. 5. While I see the point, the app/caller has to make two different APIs calls to obtain active/standby and potentially do the same set of operation to query the state. I personally still like a method like isActive() better, but don't have strong opinions. 9. If we do expose the lag information, could we just leave it upto to the caller to decide whether it errors out or not and not make the decision within Streams? i.e we don't need a new config 14. +1 . If it's easier to do right away. We started with number of records, following the lead from KIP-441 On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar wrote: > > Thanks, everyone for taking a look. Some very cool ideas have flown in. > > >> There was a follow-on idea I POCed to continuously share lag > information in the heartbeat protocol+1 that would be great, I will update > the KIP assuming this work will finish soon > >> I think that adding a new method to StreamsMetadataState and > deprecating the existing method isthe best way to go; we just can't change > the return types of any existing methods.+1 on this, we will add new > methods for users who would be interested in querying back a list of > possible options to query from and leave the current function > getStreamsMetadataForKey() untouched for users who want absolute > consistency. > >> why not just always return all available metadata (including > active/standby or lag) and let the caller decide to which node they want to > route the query+1. I think this makes sense as from a user standpoint there > is no difference b/w an active and a standby if both have same lag, Infact > users would be able to use this API to reduce query load on actives, so > returning all available options along with the current lag in each would > make sense and leave it to user how they want to use this data. This has > another added advantage. If a user queries any random machine for a key and > that machine has a replica for the partition(where key belongs) user might > choose to serve the data from there itself(if it doesn’t lag much) rather > than finding the active and making an IQ to that. This would save some > critical time in serving for some applications. > >> Adding the lag in terms of timestamp diff comparing the committed > offset.+1 on this, I think it’s more readable. But as John said the > function allMetadataForKey() is just returning the possible options from > where users can query a key, so we can even drop the parameter > enableReplicaServing/tolerableDataStaleness and just return all the > streamsMetadata containing that key along with the offset limit. > Answering the questions posted by Matthias in sequence. > 1. @John can you please comment on this one.2. Yeah the usage pattern > would include querying this prior to every request 3. Will add the changes > to StreamsMetadata in the KIP, would include changes in rebuildMetadata() > etc.4. Makes sense, already addressed above5. Is it important from a user > perspective if they are querying an active(processing), active(restoring), > a standby task if we have away of denoting lag in a readable manner which > kind of signifies the user that this is the best node to query the fresh > data.6. Yes, I intend to return the actives and replicas in the same return > list in allMetadataForKey()7. tricky8. yes, we need new functions to return > activeRestoring and standbyRunning tasks.9. StreamsConfig doesn’t look like > of much use to me since we are giving all possible options via this > function, or they can use existing function getStreamsMetadataForKey() and > get just the active10. I think treat them both the same and let the lag do > the talking11. We are just sending them the option to query from in > allMetadataForKey(), which doesn’t include any handle. We then query that > machine for the key where it calls allStores() and tries to find the task > in activeRunning/activeRestoring/standbyRunning and adds the store handle > here. 12. Need to verify, but during the exact point when store is closed > to transition it from restoring to running the queries will fail. The > caller in such case can have their own configurable retries to check again > or try the replica if a call fails to active13. I think KIP-216 is working > on those lines, we might not need few of those exceptions since now the > basic idea of this KIP is to support IQ
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks, everyone for taking a look. Some very cool ideas have flown in. >> There was a follow-on idea I POCed to continuously share lag information in >> the heartbeat protocol+1 that would be great, I will update the KIP assuming >> this work will finish soon >> I think that adding a new method to StreamsMetadataState and deprecating the >> existing method isthe best way to go; we just can't change the return types >> of any existing methods.+1 on this, we will add new methods for users who >> would be interested in querying back a list of possible options to query >> from and leave the current function getStreamsMetadataForKey() untouched for >> users who want absolute consistency. >> why not just always return all available metadata (including active/standby >> or lag) and let the caller decide to which node they want to route the >> query+1. I think this makes sense as from a user standpoint there is no >> difference b/w an active and a standby if both have same lag, Infact users >> would be able to use this API to reduce query load on actives, so returning >> all available options along with the current lag in each would make sense >> and leave it to user how they want to use this data. This has another added >> advantage. If a user queries any random machine for a key and that machine >> has a replica for the partition(where key belongs) user might choose to >> serve the data from there itself(if it doesn’t lag much) rather than finding >> the active and making an IQ to that. This would save some critical time in >> serving for some applications. >> Adding the lag in terms of timestamp diff comparing the committed offset.+1 >> on this, I think it’s more readable. But as John said the function >> allMetadataForKey() is just returning the possible options from where users >> can query a key, so we can even drop the parameter >> enableReplicaServing/tolerableDataStaleness and just return all the >> streamsMetadata containing that key along with the offset limit. Answering the questions posted by Matthias in sequence. 1. @John can you please comment on this one.2. Yeah the usage pattern would include querying this prior to every request 3. Will add the changes to StreamsMetadata in the KIP, would include changes in rebuildMetadata() etc.4. Makes sense, already addressed above5. Is it important from a user perspective if they are querying an active(processing), active(restoring), a standby task if we have away of denoting lag in a readable manner which kind of signifies the user that this is the best node to query the fresh data.6. Yes, I intend to return the actives and replicas in the same return list in allMetadataForKey()7. tricky8. yes, we need new functions to return activeRestoring and standbyRunning tasks.9. StreamsConfig doesn’t look like of much use to me since we are giving all possible options via this function, or they can use existing function getStreamsMetadataForKey() and get just the active10. I think treat them both the same and let the lag do the talking11. We are just sending them the option to query from in allMetadataForKey(), which doesn’t include any handle. We then query that machine for the key where it calls allStores() and tries to find the task in activeRunning/activeRestoring/standbyRunning and adds the store handle here. 12. Need to verify, but during the exact point when store is closed to transition it from restoring to running the queries will fail. The caller in such case can have their own configurable retries to check again or try the replica if a call fails to active13. I think KIP-216 is working on those lines, we might not need few of those exceptions since now the basic idea of this KIP is to support IQ during rebalancing.14. Addressed above, agreed it looks more readable. On Tuesday, 22 October, 2019, 08:39:07 pm IST, Matthias J. Sax wrote: One more thought: 14) Is specifying the allowed lag in number of records a useful way for users to declare how stale an instance is allowed to be? Would it be more intuitive for users to specify the allowed lag in time units (would event time or processing time be better)? It seems hard for users to reason how "fresh" a store really is when number of records is used. -Matthias On 10/21/19 9:02 PM, Matthias J. Sax wrote: > Some more follow up thoughts: > > 11) If we get a store handle of an active(restoring) task, and the task > transits to running, does the store handle become invalid and a new one > must be retrieved? Or can we "switch it out" underneath -- for this > case, how does the user know when they start to query the up-to-date state? > > 12) Standby tasks will have the store open in regular mode, while > active(restoring) tasks open stores in "upgrade mode" for more efficient > bulk loading. When we switch the store into active mode, we close it and > reopen it. What is the impact if we query the store during restore? What > is the impact if we
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
One more thought: 14) Is specifying the allowed lag in number of records a useful way for users to declare how stale an instance is allowed to be? Would it be more intuitive for users to specify the allowed lag in time units (would event time or processing time be better)? It seems hard for users to reason how "fresh" a store really is when number of records is used. -Matthias On 10/21/19 9:02 PM, Matthias J. Sax wrote: > Some more follow up thoughts: > > 11) If we get a store handle of an active(restoring) task, and the task > transits to running, does the store handle become invalid and a new one > must be retrieved? Or can we "switch it out" underneath -- for this > case, how does the user know when they start to query the up-to-date state? > > 12) Standby tasks will have the store open in regular mode, while > active(restoring) tasks open stores in "upgrade mode" for more efficient > bulk loading. When we switch the store into active mode, we close it and > reopen it. What is the impact if we query the store during restore? What > is the impact if we close the store to transit to running (eg, there > might be open iterators)? > > 13) Do we need to introduced new exception types? Compare KIP-216 > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors) > that aims to improve the user experience with regard to IQ exceptions. > > > -Matthias > > On 10/21/19 6:39 PM, Matthias J. Sax wrote: >> Thanks for the KIP. >> >> Couple of comments: >> >> 1) With regard to KIP-441, my current understanding is that the lag >> information is only reported to the leader (please correct me if I am >> wrong). This seems to be quite a limitation to actually use the lag >> information. >> >> 2) The idea of the metadata API is actually to get metadata once and >> only refresh the metadata if a store was migrated. The current proposal >> would require to get the metadata before each query. The KIP should >> describe the usage pattern and impact in more detail. >> >> 3) Currently, the KIP does not list the public API changes in detail. >> Please list all methods you intend to deprecate and list all methods you >> intend to add (best, using a code-block markup -- compare >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements >> as an example) >> >> 4) Also note (as already pointed out by John), that we cannot have any >> breaking API changes. Thus, the API should be designed in a fully >> backward compatible manner. >> >> 5) Returning a list of metadata object makes it hard for user to know if >> the first object refers to the active(processing), active(restoring), or >> a standby task. IMHO, we should be more explicit. For example, a >> metadata object could have a flag that one can test via `#isActive()`. >> Or maybe even better, we could keep the current API as-is and add >> something like `standbyMetadataForKey()` (and similar methods for >> other). Having just a flag `isActive()` is a little subtle and having >> new overloads would make the API much clearer (passing in a boolean flag >> does not seem to be a nice API). >> >> 6) Do you intent to return all standby metadata information at once, >> similar to `allMetadata()` -- seems to be useful. >> >> 7) Even if the lag information is propagated to all instances, it will >> happen in an async manner. Hence, I am wondering if we should address >> this race condition (I think we should). The idea would be to check if a >> standby/active(restoring) task is actually still within the lag bounds >> when a query is executed and we would throw an exception if not. >> >> 8) The current `KafkaStreams#state()` method only returns a handle to >> stores of active(processing) tasks. How can a user actually get a handle >> to an store of an active(restoring) or standby task for querying? Seems >> we should add a new method to get standby handles? Changing the >> semantics to existing `state()` would be possible, but I think adding a >> new method is preferable? >> >> 9) How does the user actually specify the acceptable lag? A global >> config via StreamsConfig (this would be a public API change that needs >> to be covered in the KIP)? Or on a per-store or even per-query basis for >> more flexibility? We could also have a global setting that is used as >> default and allow to overwrite it on a per-query basis. >> >> 10) Do we need to distinguish between active(restoring) and standby >> tasks? Or could be treat both as the same? >> >> >> >> >> -Matthias >> >> >> On 10/21/19 5:40 PM, Vinoth Chandar wrote: > I'm wondering, rather than putting "acceptable lag" into the >>> configuration at all, or even making it a parameter on `allMetadataForKey`, >>> why not just _always_ return all available metadata (including >>> active/standby or lag) and let the caller decide to which node they want to >>> route the query? >>> +1 on exposing lag
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Some more follow up thoughts: 11) If we get a store handle of an active(restoring) task, and the task transits to running, does the store handle become invalid and a new one must be retrieved? Or can we "switch it out" underneath -- for this case, how does the user know when they start to query the up-to-date state? 12) Standby tasks will have the store open in regular mode, while active(restoring) tasks open stores in "upgrade mode" for more efficient bulk loading. When we switch the store into active mode, we close it and reopen it. What is the impact if we query the store during restore? What is the impact if we close the store to transit to running (eg, there might be open iterators)? 13) Do we need to introduced new exception types? Compare KIP-216 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors) that aims to improve the user experience with regard to IQ exceptions. -Matthias On 10/21/19 6:39 PM, Matthias J. Sax wrote: > Thanks for the KIP. > > Couple of comments: > > 1) With regard to KIP-441, my current understanding is that the lag > information is only reported to the leader (please correct me if I am > wrong). This seems to be quite a limitation to actually use the lag > information. > > 2) The idea of the metadata API is actually to get metadata once and > only refresh the metadata if a store was migrated. The current proposal > would require to get the metadata before each query. The KIP should > describe the usage pattern and impact in more detail. > > 3) Currently, the KIP does not list the public API changes in detail. > Please list all methods you intend to deprecate and list all methods you > intend to add (best, using a code-block markup -- compare > https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements > as an example) > > 4) Also note (as already pointed out by John), that we cannot have any > breaking API changes. Thus, the API should be designed in a fully > backward compatible manner. > > 5) Returning a list of metadata object makes it hard for user to know if > the first object refers to the active(processing), active(restoring), or > a standby task. IMHO, we should be more explicit. For example, a > metadata object could have a flag that one can test via `#isActive()`. > Or maybe even better, we could keep the current API as-is and add > something like `standbyMetadataForKey()` (and similar methods for > other). Having just a flag `isActive()` is a little subtle and having > new overloads would make the API much clearer (passing in a boolean flag > does not seem to be a nice API). > > 6) Do you intent to return all standby metadata information at once, > similar to `allMetadata()` -- seems to be useful. > > 7) Even if the lag information is propagated to all instances, it will > happen in an async manner. Hence, I am wondering if we should address > this race condition (I think we should). The idea would be to check if a > standby/active(restoring) task is actually still within the lag bounds > when a query is executed and we would throw an exception if not. > > 8) The current `KafkaStreams#state()` method only returns a handle to > stores of active(processing) tasks. How can a user actually get a handle > to an store of an active(restoring) or standby task for querying? Seems > we should add a new method to get standby handles? Changing the > semantics to existing `state()` would be possible, but I think adding a > new method is preferable? > > 9) How does the user actually specify the acceptable lag? A global > config via StreamsConfig (this would be a public API change that needs > to be covered in the KIP)? Or on a per-store or even per-query basis for > more flexibility? We could also have a global setting that is used as > default and allow to overwrite it on a per-query basis. > > 10) Do we need to distinguish between active(restoring) and standby > tasks? Or could be treat both as the same? > > > > > -Matthias > > > On 10/21/19 5:40 PM, Vinoth Chandar wrote: I'm wondering, rather than putting "acceptable lag" into the >> configuration at all, or even making it a parameter on `allMetadataForKey`, >> why not just _always_ return all available metadata (including >> active/standby or lag) and let the caller decide to which node they want to >> route the query? >> +1 on exposing lag information via the APIs. IMO without having >> continuously updated/fresh lag information, its true value as a signal for >> query routing decisions is much limited. But we can design the API around >> this model and iterate? Longer term, we should have continuously shared lag >> information. >> more general to refactor it to "allMetadataForKey(long >> tolerableDataStaleness, ...)", and when it's set to 0 it means "active task >> only". >> +1 IMO if we plan on having `enableReplicaServing`, it makes sense to >> generalize
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks for the KIP. Couple of comments: 1) With regard to KIP-441, my current understanding is that the lag information is only reported to the leader (please correct me if I am wrong). This seems to be quite a limitation to actually use the lag information. 2) The idea of the metadata API is actually to get metadata once and only refresh the metadata if a store was migrated. The current proposal would require to get the metadata before each query. The KIP should describe the usage pattern and impact in more detail. 3) Currently, the KIP does not list the public API changes in detail. Please list all methods you intend to deprecate and list all methods you intend to add (best, using a code-block markup -- compare https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements as an example) 4) Also note (as already pointed out by John), that we cannot have any breaking API changes. Thus, the API should be designed in a fully backward compatible manner. 5) Returning a list of metadata object makes it hard for user to know if the first object refers to the active(processing), active(restoring), or a standby task. IMHO, we should be more explicit. For example, a metadata object could have a flag that one can test via `#isActive()`. Or maybe even better, we could keep the current API as-is and add something like `standbyMetadataForKey()` (and similar methods for other). Having just a flag `isActive()` is a little subtle and having new overloads would make the API much clearer (passing in a boolean flag does not seem to be a nice API). 6) Do you intent to return all standby metadata information at once, similar to `allMetadata()` -- seems to be useful. 7) Even if the lag information is propagated to all instances, it will happen in an async manner. Hence, I am wondering if we should address this race condition (I think we should). The idea would be to check if a standby/active(restoring) task is actually still within the lag bounds when a query is executed and we would throw an exception if not. 8) The current `KafkaStreams#state()` method only returns a handle to stores of active(processing) tasks. How can a user actually get a handle to an store of an active(restoring) or standby task for querying? Seems we should add a new method to get standby handles? Changing the semantics to existing `state()` would be possible, but I think adding a new method is preferable? 9) How does the user actually specify the acceptable lag? A global config via StreamsConfig (this would be a public API change that needs to be covered in the KIP)? Or on a per-store or even per-query basis for more flexibility? We could also have a global setting that is used as default and allow to overwrite it on a per-query basis. 10) Do we need to distinguish between active(restoring) and standby tasks? Or could be treat both as the same? -Matthias On 10/21/19 5:40 PM, Vinoth Chandar wrote: >>> I'm wondering, rather than putting "acceptable lag" into the > configuration at all, or even making it a parameter on `allMetadataForKey`, > why not just _always_ return all available metadata (including > active/standby or lag) and let the caller decide to which node they want to > route the query? > +1 on exposing lag information via the APIs. IMO without having > continuously updated/fresh lag information, its true value as a signal for > query routing decisions is much limited. But we can design the API around > this model and iterate? Longer term, we should have continuously shared lag > information. > >>> more general to refactor it to "allMetadataForKey(long > tolerableDataStaleness, ...)", and when it's set to 0 it means "active task > only". > +1 IMO if we plan on having `enableReplicaServing`, it makes sense to > generalize based on dataStaleness. This seems complementary to exposing the > lag information itself. > >>> This is actually not a public api change at all, and I'm planning to > implement it asap as a precursor to the rest of KIP-441 > +1 again. Do we have a concrete timeline for when this change will land on > master? I would like to get the implementation wrapped up (as much as > possible) by end of the month. :). But I agree this sequencing makes > sense.. > > > On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang wrote: > >> Hi Navinder, >> >> Thanks for the KIP, I have a high level question about the proposed API >> regarding: >> >> "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)" >> >> I'm wondering if it's more general to refactor it to >> "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to >> 0 it means "active task only". Behind the scene, we can have the committed >> offsets to encode the stream time as well, so that when processing standby >> tasks the stream process knows not long the lag in terms of offsets >> comparing to the committed offset (internally we call it offset limit), but >> also the
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
>>I'm wondering, rather than putting "acceptable lag" into the configuration at all, or even making it a parameter on `allMetadataForKey`, why not just _always_ return all available metadata (including active/standby or lag) and let the caller decide to which node they want to route the query? +1 on exposing lag information via the APIs. IMO without having continuously updated/fresh lag information, its true value as a signal for query routing decisions is much limited. But we can design the API around this model and iterate? Longer term, we should have continuously shared lag information. >>more general to refactor it to "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to 0 it means "active task only". +1 IMO if we plan on having `enableReplicaServing`, it makes sense to generalize based on dataStaleness. This seems complementary to exposing the lag information itself. >>This is actually not a public api change at all, and I'm planning to implement it asap as a precursor to the rest of KIP-441 +1 again. Do we have a concrete timeline for when this change will land on master? I would like to get the implementation wrapped up (as much as possible) by end of the month. :). But I agree this sequencing makes sense.. On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang wrote: > Hi Navinder, > > Thanks for the KIP, I have a high level question about the proposed API > regarding: > > "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)" > > I'm wondering if it's more general to refactor it to > "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to > 0 it means "active task only". Behind the scene, we can have the committed > offsets to encode the stream time as well, so that when processing standby > tasks the stream process knows not long the lag in terms of offsets > comparing to the committed offset (internally we call it offset limit), but > also the lag in terms of timestamp diff comparing the committed offset. > > Also encoding the timestamp as part of offset have other benefits for > improving Kafka Streams time semantics as well, but for KIP-535 itself I > think it can help giving users a more intuitive interface to reason about. > > > Guozhang > > On Mon, Oct 21, 2019 at 12:30 PM John Roesler wrote: > > > Hey Navinder, > > > > Thanks for the KIP! I've been reading over the discussion thus far, > > and I have a couple of thoughts to pile on as well: > > > > It seems confusing to propose the API in terms of the current system > > state, but also propose how the API would look if/when KIP-441 is > > implemented. It occurs to me that the only part of KIP-441 that would > > affect you is the availability of the lag information in the > > SubscriptionInfo message. This is actually not a public api change at > > all, and I'm planning to implement it asap as a precursor to the rest > > of KIP-441, so maybe you can just build on top of KIP-441 and assume > > the lag information will be available. Then you could have a more > > straightforward proposal (e.g., mention that you'd return the lag > > information in AssignmentInfo as well as in the StreamsMetadata in > > some form, or make use of it in the API somehow). > > > > I'm partially motivated in that former point because it seems like > > understanding how callers would bound the staleness for their use case > > is _the_ key point for this KIP. FWIW, I think that adding a new > > method to StreamsMetadataState and deprecating the existing method is > > the best way to go; we just can't change the return types of any > > existing methods. > > > > I'm wondering, rather than putting "acceptable lag" into the > > configuration at all, or even making it a parameter on > > `allMetadataForKey`, why not just _always_ return all available > > metadata (including active/standby or lag) and let the caller decide > > to which node they want to route the query? This method isn't making > > any queries itself; it's merely telling you where the local Streams > > instance _thinks_ the key in question is located. Just returning all > > available information lets the caller implement any semantics they > > desire around querying only active stores, or standbys, or recovering > > stores, or whatever. > > > > One fly in the ointment, which you may wish to consider if proposing > > to use lag information, is that the cluster would only become aware of > > new lag information during rebalances. Even in the full expression of > > KIP-441, this information would stop being propagated when the cluster > > achieves a balanced task distribution. There was a follow-on idea I > > POCed to continuously share lag information in the heartbeat protocol, > > which you might be interested in, if you want to make sure that nodes > > are basically _always_ aware of each others' lag on different > > partitions: https://github.com/apache/kafka/pull/7096 > > > > Thanks again! > > -John > > > > > > On Sat, Oct 19, 2019 at 6:06 AM
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hi Navinder, Thanks for the KIP, I have a high level question about the proposed API regarding: "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)" I'm wondering if it's more general to refactor it to "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to 0 it means "active task only". Behind the scene, we can have the committed offsets to encode the stream time as well, so that when processing standby tasks the stream process knows not long the lag in terms of offsets comparing to the committed offset (internally we call it offset limit), but also the lag in terms of timestamp diff comparing the committed offset. Also encoding the timestamp as part of offset have other benefits for improving Kafka Streams time semantics as well, but for KIP-535 itself I think it can help giving users a more intuitive interface to reason about. Guozhang On Mon, Oct 21, 2019 at 12:30 PM John Roesler wrote: > Hey Navinder, > > Thanks for the KIP! I've been reading over the discussion thus far, > and I have a couple of thoughts to pile on as well: > > It seems confusing to propose the API in terms of the current system > state, but also propose how the API would look if/when KIP-441 is > implemented. It occurs to me that the only part of KIP-441 that would > affect you is the availability of the lag information in the > SubscriptionInfo message. This is actually not a public api change at > all, and I'm planning to implement it asap as a precursor to the rest > of KIP-441, so maybe you can just build on top of KIP-441 and assume > the lag information will be available. Then you could have a more > straightforward proposal (e.g., mention that you'd return the lag > information in AssignmentInfo as well as in the StreamsMetadata in > some form, or make use of it in the API somehow). > > I'm partially motivated in that former point because it seems like > understanding how callers would bound the staleness for their use case > is _the_ key point for this KIP. FWIW, I think that adding a new > method to StreamsMetadataState and deprecating the existing method is > the best way to go; we just can't change the return types of any > existing methods. > > I'm wondering, rather than putting "acceptable lag" into the > configuration at all, or even making it a parameter on > `allMetadataForKey`, why not just _always_ return all available > metadata (including active/standby or lag) and let the caller decide > to which node they want to route the query? This method isn't making > any queries itself; it's merely telling you where the local Streams > instance _thinks_ the key in question is located. Just returning all > available information lets the caller implement any semantics they > desire around querying only active stores, or standbys, or recovering > stores, or whatever. > > One fly in the ointment, which you may wish to consider if proposing > to use lag information, is that the cluster would only become aware of > new lag information during rebalances. Even in the full expression of > KIP-441, this information would stop being propagated when the cluster > achieves a balanced task distribution. There was a follow-on idea I > POCed to continuously share lag information in the heartbeat protocol, > which you might be interested in, if you want to make sure that nodes > are basically _always_ aware of each others' lag on different > partitions: https://github.com/apache/kafka/pull/7096 > > Thanks again! > -John > > > On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar > wrote: > > > > Thanks, Vinoth. Looks like we are on the same page. I will add some of > these explanations to the KIP as well. Have assigned the KAFKA-6144 to > myself and KAFKA-8994 is closed(by you). As suggested, we will replace > "replica" with "standby". > > > > In the new API, "StreamsMetadataState::allMetadataForKey(boolean > enableReplicaServing, String storeName, K key, Serializer > keySerializer)" Do we really need a per key configuration? or a new > StreamsConfig is good enough?>> Coming from experience, when teams are > building a platform with Kafka Streams and these API's serve data to > multiple teams, we can't have a generalized config that says as a platform > we will support stale reads or not. It should be the choice of someone who > is calling the API's to choose whether they are ok with stale reads or not. > Makes sense? > > On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar < > vchan...@confluent.io> wrote: > > > > Looks like we are covering ground :) > > > > >>Only if it is within a permissible range(say 1) we will serve from > > Restoring state of active. > > +1 on having a knob like this.. My reasoning is as follows. > > > > Looking at the Streams state as a read-only distributed kv store. With > > num_standby = f , we should be able to tolerate f failures and if there > is > > a f+1' failure, the system should be unavailable. > > > > A) So with num_standby=0, the system
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hey Navinder, Thanks for the KIP! I've been reading over the discussion thus far, and I have a couple of thoughts to pile on as well: It seems confusing to propose the API in terms of the current system state, but also propose how the API would look if/when KIP-441 is implemented. It occurs to me that the only part of KIP-441 that would affect you is the availability of the lag information in the SubscriptionInfo message. This is actually not a public api change at all, and I'm planning to implement it asap as a precursor to the rest of KIP-441, so maybe you can just build on top of KIP-441 and assume the lag information will be available. Then you could have a more straightforward proposal (e.g., mention that you'd return the lag information in AssignmentInfo as well as in the StreamsMetadata in some form, or make use of it in the API somehow). I'm partially motivated in that former point because it seems like understanding how callers would bound the staleness for their use case is _the_ key point for this KIP. FWIW, I think that adding a new method to StreamsMetadataState and deprecating the existing method is the best way to go; we just can't change the return types of any existing methods. I'm wondering, rather than putting "acceptable lag" into the configuration at all, or even making it a parameter on `allMetadataForKey`, why not just _always_ return all available metadata (including active/standby or lag) and let the caller decide to which node they want to route the query? This method isn't making any queries itself; it's merely telling you where the local Streams instance _thinks_ the key in question is located. Just returning all available information lets the caller implement any semantics they desire around querying only active stores, or standbys, or recovering stores, or whatever. One fly in the ointment, which you may wish to consider if proposing to use lag information, is that the cluster would only become aware of new lag information during rebalances. Even in the full expression of KIP-441, this information would stop being propagated when the cluster achieves a balanced task distribution. There was a follow-on idea I POCed to continuously share lag information in the heartbeat protocol, which you might be interested in, if you want to make sure that nodes are basically _always_ aware of each others' lag on different partitions: https://github.com/apache/kafka/pull/7096 Thanks again! -John On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar wrote: > > Thanks, Vinoth. Looks like we are on the same page. I will add some of these > explanations to the KIP as well. Have assigned the KAFKA-6144 to myself and > KAFKA-8994 is closed(by you). As suggested, we will replace "replica" with > "standby". > > In the new API, "StreamsMetadataState::allMetadataForKey(boolean > enableReplicaServing, String storeName, K key, Serializer keySerializer)" > Do we really need a per key configuration? or a new StreamsConfig is good > enough?>> Coming from experience, when teams are building a platform with > Kafka Streams and these API's serve data to multiple teams, we can't have a > generalized config that says as a platform we will support stale reads or > not. It should be the choice of someone who is calling the API's to choose > whether they are ok with stale reads or not. Makes sense? > On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar > wrote: > > Looks like we are covering ground :) > > >>Only if it is within a permissible range(say 1) we will serve from > Restoring state of active. > +1 on having a knob like this.. My reasoning is as follows. > > Looking at the Streams state as a read-only distributed kv store. With > num_standby = f , we should be able to tolerate f failures and if there is > a f+1' failure, the system should be unavailable. > > A) So with num_standby=0, the system should be unavailable even if there is > 1 failure and thats my argument for not allowing querying in restoration > state, esp in this case it will be a total rebuild of the state (which IMO > cannot be considered a normal fault free operational state). > > B) Even there are standby's, say num_standby=2, if the user decides to shut > down all 3 instances, then only outcome should be unavailability until all > of them come back or state is rebuilt on other nodes in the cluster. In > normal operations, f <= 2 and when a failure does happen we can then either > choose to be C over A and fail IQs until replication is fully caught up or > choose A over C by serving in restoring state as long as lag is minimal. If > even with f=1 say, all the standbys are lagging a lot due to some issue, > then that should be considered a failure since that is different from > normal/expected operational mode. Serving reads with unbounded replication > lag and calling it "available" may not be very usable or even desirable :) > IMHO, since it gives the user no way to reason about the app that is going > to
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks, Vinoth. Looks like we are on the same page. I will add some of these explanations to the KIP as well. Have assigned the KAFKA-6144 to myself and KAFKA-8994 is closed(by you). As suggested, we will replace "replica" with "standby". In the new API, "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do we really need a per key configuration? or a new StreamsConfig is good enough?>> Coming from experience, when teams are building a platform with Kafka Streams and these API's serve data to multiple teams, we can't have a generalized config that says as a platform we will support stale reads or not. It should be the choice of someone who is calling the API's to choose whether they are ok with stale reads or not. Makes sense? On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar wrote: Looks like we are covering ground :) >>Only if it is within a permissible range(say 1) we will serve from Restoring state of active. +1 on having a knob like this.. My reasoning is as follows. Looking at the Streams state as a read-only distributed kv store. With num_standby = f , we should be able to tolerate f failures and if there is a f+1' failure, the system should be unavailable. A) So with num_standby=0, the system should be unavailable even if there is 1 failure and thats my argument for not allowing querying in restoration state, esp in this case it will be a total rebuild of the state (which IMO cannot be considered a normal fault free operational state). B) Even there are standby's, say num_standby=2, if the user decides to shut down all 3 instances, then only outcome should be unavailability until all of them come back or state is rebuilt on other nodes in the cluster. In normal operations, f <= 2 and when a failure does happen we can then either choose to be C over A and fail IQs until replication is fully caught up or choose A over C by serving in restoring state as long as lag is minimal. If even with f=1 say, all the standbys are lagging a lot due to some issue, then that should be considered a failure since that is different from normal/expected operational mode. Serving reads with unbounded replication lag and calling it "available" may not be very usable or even desirable :) IMHO, since it gives the user no way to reason about the app that is going to query this store. So there is definitely a need to distinguish between : Replication catchup while being in fault free state vs Restoration of state when we lose more than f standbys. This knob is a great starting point towards this. If you agree with some of the explanation above, please feel free to include it in the KIP as well since this is sort of our design principle here.. Small nits : - let's standardize on "standby" instead of "replica", KIP or code, to be consistent with rest of Streams code/docs? - Can we merge KAFKA-8994 into KAFKA-6144 now and close the former? Eventually need to consolidate KAFKA-6555 as well - In the new API, "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do we really need a per key configuration? or a new StreamsConfig is good enough? On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar wrote: > @Vinoth, I have incorporated a few of the discussions we have had in the > KIP. > > In the current code, t0 and t1 serve queries from Active(Running) > partition. For case t2, we are planning to return List > such that it returns so that if IQ > fails on A, the replica on B can serve the data by enabling serving from > replicas. This still does not solve case t3 and t4 since B has been > promoted to active but it is in Restoring state to catchup till A’s last > committed position as we don’t serve from Restoring state in Active and new > Replica on R is building itself from scratch. Both these cases can be > solved if we start serving from Restoring state of active as well since it > is almost equivalent to previous Active. > > There could be a case where all replicas of a partition become unavailable > and active and all replicas of that partition are building themselves from > scratch, in this case, the state in Active is far behind even though it is > in Restoring state. To cater to such cases that we don’t serve from this > state we can either add another state before Restoring or check the > difference between last committed offset and current position. Only if it > is within a permissible range (say 1) we will serve from Restoring the > state of Active. > > > On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar < > vchan...@confluent.io> wrote: > > Thanks for the updates on the KIP, Navinder! > > Few comments > > - AssignmentInfo is not public API?. But we will change it and thus need to > increment the version and test for version_probing etc. Good to separate > that from StreamsMetadata changes (which is public API) > -
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Looks like we are covering ground :) >>Only if it is within a permissible range(say 1) we will serve from Restoring state of active. +1 on having a knob like this.. My reasoning is as follows. Looking at the Streams state as a read-only distributed kv store. With num_standby = f , we should be able to tolerate f failures and if there is a f+1' failure, the system should be unavailable. A) So with num_standby=0, the system should be unavailable even if there is 1 failure and thats my argument for not allowing querying in restoration state, esp in this case it will be a total rebuild of the state (which IMO cannot be considered a normal fault free operational state). B) Even there are standby's, say num_standby=2, if the user decides to shut down all 3 instances, then only outcome should be unavailability until all of them come back or state is rebuilt on other nodes in the cluster. In normal operations, f <= 2 and when a failure does happen we can then either choose to be C over A and fail IQs until replication is fully caught up or choose A over C by serving in restoring state as long as lag is minimal. If even with f=1 say, all the standbys are lagging a lot due to some issue, then that should be considered a failure since that is different from normal/expected operational mode. Serving reads with unbounded replication lag and calling it "available" may not be very usable or even desirable :) IMHO, since it gives the user no way to reason about the app that is going to query this store. So there is definitely a need to distinguish between : Replication catchup while being in fault free state vs Restoration of state when we lose more than f standbys. This knob is a great starting point towards this. If you agree with some of the explanation above, please feel free to include it in the KIP as well since this is sort of our design principle here.. Small nits : - let's standardize on "standby" instead of "replica", KIP or code, to be consistent with rest of Streams code/docs? - Can we merge KAFKA-8994 into KAFKA-6144 now and close the former? Eventually need to consolidate KAFKA-6555 as well - In the new API, "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do we really need a per key configuration? or a new StreamsConfig is good enough? On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar wrote: > @Vinoth, I have incorporated a few of the discussions we have had in the > KIP. > > In the current code, t0 and t1 serve queries from Active(Running) > partition. For case t2, we are planning to return List > such that it returns so that if IQ > fails on A, the replica on B can serve the data by enabling serving from > replicas. This still does not solve case t3 and t4 since B has been > promoted to active but it is in Restoring state to catchup till A’s last > committed position as we don’t serve from Restoring state in Active and new > Replica on R is building itself from scratch. Both these cases can be > solved if we start serving from Restoring state of active as well since it > is almost equivalent to previous Active. > > There could be a case where all replicas of a partition become unavailable > and active and all replicas of that partition are building themselves from > scratch, in this case, the state in Active is far behind even though it is > in Restoring state. To cater to such cases that we don’t serve from this > state we can either add another state before Restoring or check the > difference between last committed offset and current position. Only if it > is within a permissible range (say 1) we will serve from Restoring the > state of Active. > > > On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar < > vchan...@confluent.io> wrote: > > Thanks for the updates on the KIP, Navinder! > > Few comments > > - AssignmentInfo is not public API?. But we will change it and thus need to > increment the version and test for version_probing etc. Good to separate > that from StreamsMetadata changes (which is public API) > - From what I see, there is going to be choice between the following > > A) introducing a new *KafkaStreams::allMetadataForKey() *API that > potentially returns List ordered from most upto date to > least upto date replicas. Today we cannot fully implement this ordering, > since all we know is which hosts are active and which are standbys. > However, this aligns well with the future. KIP-441 adds the lag information > to the rebalancing protocol. We could also sort replicas based on the > report lags eventually. This is fully backwards compatible with existing > clients. Only drawback I see is the naming of the existing method > KafkaStreams::metadataForKey, not conveying the distinction that it simply > returns the active replica i.e allMetadataForKey.get(0). > B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking > change. > > I prefer A, since none of the
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
@Vinoth, I have incorporated a few of the discussions we have had in the KIP. In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List such that it returns so that if IQ fails on A, the replica on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in Active and new Replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well since it is almost equivalent to previous Active. There could be a case where all replicas of a partition become unavailable and active and all replicas of that partition are building themselves from scratch, in this case, the state in Active is far behind even though it is in Restoring state. To cater to such cases that we don’t serve from this state we can either add another state before Restoring or check the difference between last committed offset and current position. Only if it is within a permissible range (say 1) we will serve from Restoring the state of Active. On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar wrote: Thanks for the updates on the KIP, Navinder! Few comments - AssignmentInfo is not public API?. But we will change it and thus need to increment the version and test for version_probing etc. Good to separate that from StreamsMetadata changes (which is public API) - From what I see, there is going to be choice between the following A) introducing a new *KafkaStreams::allMetadataForKey() *API that potentially returns List ordered from most upto date to least upto date replicas. Today we cannot fully implement this ordering, since all we know is which hosts are active and which are standbys. However, this aligns well with the future. KIP-441 adds the lag information to the rebalancing protocol. We could also sort replicas based on the report lags eventually. This is fully backwards compatible with existing clients. Only drawback I see is the naming of the existing method KafkaStreams::metadataForKey, not conveying the distinction that it simply returns the active replica i.e allMetadataForKey.get(0). B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking change. I prefer A, since none of the semantics/behavior changes for existing users. Love to hear more thoughts. Can we also work this into the KIP? I already implemented A to unblock myself for now. Seems feasible to do. On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar wrote: > >>I get your point. But suppose there is a replica which has just become > active, so in that case replica will still be building itself from scratch > and this active will go to restoring state till it catches up with previous > active, wouldn't serving from a restoring active make more sense than a > replica in such case. > > KIP-441 will change this behavior such that promotion to active happens > based on how caught up a replica is. So, once we have that (work underway > already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the > staleness window should not be that long as you describe. IMO if user wants > availability for state, then should configure num.standby.replicas > 0. If > not, then on a node loss, few partitions would be unavailable for a while > (there are other ways to bring this window down, which I won't bring in > here). We could argue for querying a restoring active (say a new node added > to replace a faulty old node) based on AP vs CP principles. But not sure > reading really really old values for the sake of availability is useful. No > AP data system would be inconsistent for such a long time in practice. > > So, I still feel just limiting this to standby reads provides best > semantics. > > Just my 2c. Would love to see what others think as well. > > On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar > wrote: > >> Hi Vinoth, >> Thanks for the feedback. >> Can we link the JIRA, discussion thread also to the KIP.>> Added. >> Based on the discussion on KAFKA-6144, I was under the impression that >> this KIP is also going to cover exposing of the standby information in >> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public >> API change?>> Sure, I can add changes for 8994 in this KIP and link >> KAFKA-6144 to KAFKA-8994 as well. >> KIP seems to be focussing on restoration when a new node is added. >> KIP-441 is underway and has some major changes proposed for this. It would >> be good to clarify dependencies if any. Without KIP-441, I am not very sure >> if we should allow reads from nodes in RESTORING state, which could amount >> to many minutes/few hours of stale reads? This is different from allowing >> querying standby replicas, which could be mostly caught up and the >> staleness
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Thanks for the updates on the KIP, Navinder! Few comments - AssignmentInfo is not public API?. But we will change it and thus need to increment the version and test for version_probing etc. Good to separate that from StreamsMetadata changes (which is public API) - From what I see, there is going to be choice between the following A) introducing a new *KafkaStreams::allMetadataForKey() *API that potentially returns List ordered from most upto date to least upto date replicas. Today we cannot fully implement this ordering, since all we know is which hosts are active and which are standbys. However, this aligns well with the future. KIP-441 adds the lag information to the rebalancing protocol. We could also sort replicas based on the report lags eventually. This is fully backwards compatible with existing clients. Only drawback I see is the naming of the existing method KafkaStreams::metadataForKey, not conveying the distinction that it simply returns the active replica i.e allMetadataForKey.get(0). B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking change. I prefer A, since none of the semantics/behavior changes for existing users. Love to hear more thoughts. Can we also work this into the KIP? I already implemented A to unblock myself for now. Seems feasible to do. On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar wrote: > >>I get your point. But suppose there is a replica which has just become > active, so in that case replica will still be building itself from scratch > and this active will go to restoring state till it catches up with previous > active, wouldn't serving from a restoring active make more sense than a > replica in such case. > > KIP-441 will change this behavior such that promotion to active happens > based on how caught up a replica is. So, once we have that (work underway > already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the > staleness window should not be that long as you describe. IMO if user wants > availability for state, then should configure num.standby.replicas > 0. If > not, then on a node loss, few partitions would be unavailable for a while > (there are other ways to bring this window down, which I won't bring in > here). We could argue for querying a restoring active (say a new node added > to replace a faulty old node) based on AP vs CP principles. But not sure > reading really really old values for the sake of availability is useful. No > AP data system would be inconsistent for such a long time in practice. > > So, I still feel just limiting this to standby reads provides best > semantics. > > Just my 2c. Would love to see what others think as well. > > On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar > wrote: > >> Hi Vinoth, >> Thanks for the feedback. >> Can we link the JIRA, discussion thread also to the KIP.>> Added. >> Based on the discussion on KAFKA-6144, I was under the impression that >> this KIP is also going to cover exposing of the standby information in >> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public >> API change?>> Sure, I can add changes for 8994 in this KIP and link >> KAFKA-6144 to KAFKA-8994 as well. >> KIP seems to be focussing on restoration when a new node is added. >> KIP-441 is underway and has some major changes proposed for this. It would >> be good to clarify dependencies if any. Without KIP-441, I am not very sure >> if we should allow reads from nodes in RESTORING state, which could amount >> to many minutes/few hours of stale reads? This is different from allowing >> querying standby replicas, which could be mostly caught up and the >> staleness window could be much smaller/tolerable. (once again the focus on >> KAFKA-8994).>> I get your point. But suppose there is a replica which has >> just become active, so in that case replica will still be building itself >> from scratch and this active will go to restoring state till it catches up >> with previous active, wouldn't serving from a restoring active make more >> sense than a replica in such case. >> >> Finally, we may need to introduce a configuration to control this. Some >> users may prefer errors to stale data. Can we also add it to the KIP?>> >> Will add this. >> >> Regards, >> Navinder >> >> >> On2019/10/14 16:56:49, Vinoth Chandar wrote: >> >> >Hi Navinder,> >> >> > >> >> >Thanks for sharing the KIP! Few thoughts> >> >> > >> >> >- Can we link the JIRA, discussion thread also to the KIP> >> >> >- Based on the discussion on KAFKA-6144, I was under the impression >> that> >> >> >this KIP is also going to cover exposing of the standby information in> >> >> >StreamsMetadata and thus subsume KAFKA-8994 . That would require a >> public> >> >> >API change?> >> >> >- KIP seems to be focussing on restoration when a new node is added.> >> >> >KIP-441 is underway and has some major changes proposed for this. It >> would> >> >> >be good to clarify dependencies if any. Without KIP-441, I am not very >> sure> >> >> >if
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
>>I get your point. But suppose there is a replica which has just become active, so in that case replica will still be building itself from scratch and this active will go to restoring state till it catches up with previous active, wouldn't serving from a restoring active make more sense than a replica in such case. KIP-441 will change this behavior such that promotion to active happens based on how caught up a replica is. So, once we have that (work underway already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the staleness window should not be that long as you describe. IMO if user wants availability for state, then should configure num.standby.replicas > 0. If not, then on a node loss, few partitions would be unavailable for a while (there are other ways to bring this window down, which I won't bring in here). We could argue for querying a restoring active (say a new node added to replace a faulty old node) based on AP vs CP principles. But not sure reading really really old values for the sake of availability is useful. No AP data system would be inconsistent for such a long time in practice. So, I still feel just limiting this to standby reads provides best semantics. Just my 2c. Would love to see what others think as well. On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar wrote: > Hi Vinoth, > Thanks for the feedback. > Can we link the JIRA, discussion thread also to the KIP.>> Added. > Based on the discussion on KAFKA-6144, I was under the impression that > this KIP is also going to cover exposing of the standby information in > StreamsMetadata and thus subsume KAFKA-8994 . That would require a public > API change?>> Sure, I can add changes for 8994 in this KIP and link > KAFKA-6144 to KAFKA-8994 as well. > KIP seems to be focussing on restoration when a new node is added. > KIP-441 is underway and has some major changes proposed for this. It would > be good to clarify dependencies if any. Without KIP-441, I am not very sure > if we should allow reads from nodes in RESTORING state, which could amount > to many minutes/few hours of stale reads? This is different from allowing > querying standby replicas, which could be mostly caught up and the > staleness window could be much smaller/tolerable. (once again the focus on > KAFKA-8994).>> I get your point. But suppose there is a replica which has > just become active, so in that case replica will still be building itself > from scratch and this active will go to restoring state till it catches up > with previous active, wouldn't serving from a restoring active make more > sense than a replica in such case. > > Finally, we may need to introduce a configuration to control this. Some > users may prefer errors to stale data. Can we also add it to the KIP?>> > Will add this. > > Regards, > Navinder > > > On2019/10/14 16:56:49, Vinoth Chandar wrote: > > >Hi Navinder,> > > > > > >Thanks for sharing the KIP! Few thoughts> > > > > > >- Can we link the JIRA, discussion thread also to the KIP> > > >- Based on the discussion on KAFKA-6144, I was under the impression that> > > >this KIP is also going to cover exposing of the standby information in> > > >StreamsMetadata and thus subsume KAFKA-8994 . That would require a > public> > > >API change?> > > >- KIP seems to be focussing on restoration when a new node is added.> > > >KIP-441 is underway and has some major changes proposed for this. It > would> > > >be good to clarify dependencies if any. Without KIP-441, I am not very > sure> > > >if we should allow reads from nodes in RESTORING state, which could > amount> > > >to many minutes/few hours of stale reads? This is different > fromallowing> > > >querying standby replicas, which could be mostly caught up and the> > > >staleness window could be much smaller/tolerable. (once again the focus > on> > > >KAFKA-8994)> > > >- Finally, we may need to introduce a configuration to control this. > Some> > > >users may prefer errors to stale data. Can we also add it to the KIP?> > > > > > >Thanks> > > >Vinoth> > > > > > > > > > > > > > > >On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar> > > >wrote:> > > > > > >> Hi,> > > >> Starting a discussion on the KIP to Allow state stores to serve stale> > > >> reads during rebalance(> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > > > > >> ).> > > >> Thanks & Regards,Navinder> > > >> LinkedIn> > > >>> > >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hi Vinoth, Thanks for the feedback. Can we link the JIRA, discussion thread also to the KIP.>> Added. Based on the discussion on KAFKA-6144, I was under the impression that this KIP is also going to cover exposing of the standby information in StreamsMetadata and thus subsume KAFKA-8994 . That would require a public API change?>> Sure, I can add changes for 8994 in this KIP and link KAFKA-6144 to KAFKA-8994 as well. KIP seems to be focussing on restoration when a new node is added. KIP-441 is underway and has some major changes proposed for this. It would be good to clarify dependencies if any. Without KIP-441, I am not very sure if we should allow reads from nodes in RESTORING state, which could amount to many minutes/few hours of stale reads? This is different from allowing querying standby replicas, which could be mostly caught up and the staleness window could be much smaller/tolerable. (once again the focus on KAFKA-8994).>> I get your point. But suppose there is a replica which has just become active, so in that case replica will still be building itself from scratch and this active will go to restoring state till it catches up with previous active, wouldn't serving from a restoring active make more sense than a replica in such case. Finally, we may need to introduce a configuration to control this. Some users may prefer errors to stale data. Can we also add it to the KIP?>> Will add this. Regards, Navinder On2019/10/14 16:56:49, Vinoth Chandar wrote: >Hi Navinder,> > >Thanks for sharing the KIP! Few thoughts> > >- Can we link the JIRA, discussion thread also to the KIP> >- Based on the discussion on KAFKA-6144, I was under the impression that> >this KIP is also going to cover exposing of the standby information in> >StreamsMetadata and thus subsume KAFKA-8994 . That would require a public> >API change?> >- KIP seems to be focussing on restoration when a new node is added.> >KIP-441 is underway and has some major changes proposed for this. It would> >be good to clarify dependencies if any. Without KIP-441, I am not very sure> >if we should allow reads from nodes in RESTORING state, which could amount> >to many minutes/few hours of stale reads? This is different fromallowing> >querying standby replicas, which could be mostly caught up and the> >staleness window could be much smaller/tolerable. (once again the focus on> >KAFKA-8994)> >- Finally, we may need to introduce a configuration to control this. Some> >users may prefer errors to stale data. Can we also add it to the KIP?> > >Thanks> >Vinoth> > > > > >On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar> >wrote:> > >> Hi,> >> Starting a discussion on the KIP to Allow state stores to serve stale> >> reads during rebalance(> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance> >> ).> >> Thanks & Regards,Navinder> >> LinkedIn> >>> >
Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hi Navinder, Thanks for sharing the KIP! Few thoughts - Can we link the JIRA, discussion thread also to the KIP - Based on the discussion on KAFKA-6144, I was under the impression that this KIP is also going to cover exposing of the standby information in StreamsMetadata and thus subsume KAFKA-8994 . That would require a public API change? - KIP seems to be focussing on restoration when a new node is added. KIP-441 is underway and has some major changes proposed for this. It would be good to clarify dependencies if any. Without KIP-441, I am not very sure if we should allow reads from nodes in RESTORING state, which could amount to many minutes/few hours of stale reads? This is different from allowing querying standby replicas, which could be mostly caught up and the staleness window could be much smaller/tolerable. (once again the focus on KAFKA-8994) - Finally, we may need to introduce a configuration to control this. Some users may prefer errors to stale data. Can we also add it to the KIP? Thanks Vinoth On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar wrote: > Hi, > Starting a discussion on the KIP to Allow state stores to serve stale > reads during rebalance( > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > ). > Thanks & Regards,Navinder > LinkedIn >
[DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance
Hi, Starting a discussion on the KIP to Allow state stores to serve stale reads during rebalance(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance). Thanks & Regards,Navinder LinkedIn