Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hey Bruno, Thanks for the feedback. Sorry for the late reply, I was hoping others might weigh in as well and it got away from me. a) I like this but I think we should separate this out. This kip has already dragged on more than it should and I think that is a big enough change to get done by itself. b) I'm a bit resistant to adding a new type of processor or store for this change. It feels excessively complicated for what should be a small change. I think that it might be good but I don't want to expand the scope more than absolutely necessary here. best, walker On Wed, Apr 10, 2024 at 4:34 AM Bruno Cadonna wrote: > Hi Walker, > > Thanks for the updates! > > > (1) While I like naming the methods differently, I have also to say that > I do not like addIsomorphicGlobalStore() because it does not really tell > what the method does. I could also not come up with a better name than > addGlobalStoreWithReprocessingOnRestore(). However, I had two ideas on > which I would like to have your opinion. > > (a) Add a new GlobalStoreBuilder in which users can set if the global > state store should reprocess on restore. Additionally, to the option to > enable or disable reprocessing on restore, you could also NOT offer a > way to enable or disable logging in the GlobalStoreBuilder. Currently, > if users enable logging for a store builder that they pass into > addGlobalStore(), Kafka Streams needs to explicitly disable it again, > which is not ideal. > > (b) Add a new GlobalProcessorSupplier in which users can set if the > global state store should reprocess on restore. Another ugliness that > could be fixed with this is passing Void, Void to ProcessorSupplier. The > GlobalProcessorSupplier would just have two type parameters . > The nice aspect of this idea is that the option to enable/disable > reprocessing on restore is only needed when a processor supplier is > passed into the methods. That is not true for idea (a). > > > (2) Yes, that was my intent. > > > Best, > Bruno > > On 4/9/24 9:33 PM, Walker Carlson wrote: > > Hey all, > > > > (1) no I hadn't considered just naming the methods differently. I > actually > > really like this idea and am for it. Except we need 3 different methods > > now. One for no processor, one for a processor that should restore and > one > > that reprocesses. How about `addCustomGlobalStore` and > > `addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no > > processor case? If everyone likes that I can add that to the KIP and > rename > > the methods. > > > > (2) we can have the the built in case use StoreBuilder > KeyValueStore> and manually check for the TimestampedKeyValueStore. That > is > > fine with me. > > > > Bruno I hope that was what you were intending. > > > > (3) For the scala api, do we need to make it match the java api or are we > > just making the minimum changes? as if we take point 1 I don't know how > > much we need to change. > > > > Thanks, > > Walker > > > > > > On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax wrote: > > > >> One more thing: > >> > >> I was just looking into the WIP PR, and it seems we will also need to > >> change `StreamsBuilder.scala`. The KIP needs to cover this changes as > well. > >> > >> > >> -Matthias > >> > >> On 4/1/24 10:33 PM, Bruno Cadonna wrote: > >>> Hi Walker and Matthias, > >>> > >>> (2) > >>> That is exactly my point about having a compile time error versus a > >>> runtime error. The added flexibility as proposed by Matthias sounds > good > >>> to me. > >>> > >>> Regarding the Named parameter, I was not aware that the processor that > >>> writes records to the global state store is named according to the name > >>> passed in by Consumed. I thought Consumed strictly specifies the names > >>> of source processors. So I am fine with not having an overload with a > >>> Named parameter. > >>> > >>> Best, > >>> Bruno > >>> > >>> On 3/31/24 11:30 AM, Matthias J. Sax wrote: > Two more follow up thoughts: > > (1) I am still not a big fan of the boolean parameter we introduce. > Did you consider to use different method names, like > `addReadOnlyGlobalStore()` (for the optimized method, that would not > reprocess data on restore), and maybe add `addModifiableGlobalStore()` > (not a good name, but we cannot re-use existing `addGlobalStore()` -- > maybe somebody else has a good idea about a better `addXxxGlobalStore` > that would describe it well). > > (2) I was thinking about Bruno's comment to limit the scope the store > builder for the optimized case. I think we should actually do > something about it, because in the end, the runtime (ie, the > `Processor` we hard wire) would need to pick a store it supports and > cast to the corresponding store? If the cast fails, we hit a runtime > exception, but by putting the store we cast to into the signature we > can actually convert it into a compile time error what seems better. > -- If we w
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hi Walker, Thanks for the updates! (1) While I like naming the methods differently, I have also to say that I do not like addIsomorphicGlobalStore() because it does not really tell what the method does. I could also not come up with a better name than addGlobalStoreWithReprocessingOnRestore(). However, I had two ideas on which I would like to have your opinion. (a) Add a new GlobalStoreBuilder in which users can set if the global state store should reprocess on restore. Additionally, to the option to enable or disable reprocessing on restore, you could also NOT offer a way to enable or disable logging in the GlobalStoreBuilder. Currently, if users enable logging for a store builder that they pass into addGlobalStore(), Kafka Streams needs to explicitly disable it again, which is not ideal. (b) Add a new GlobalProcessorSupplier in which users can set if the global state store should reprocess on restore. Another ugliness that could be fixed with this is passing Void, Void to ProcessorSupplier. The GlobalProcessorSupplier would just have two type parameters . The nice aspect of this idea is that the option to enable/disable reprocessing on restore is only needed when a processor supplier is passed into the methods. That is not true for idea (a). (2) Yes, that was my intent. Best, Bruno On 4/9/24 9:33 PM, Walker Carlson wrote: Hey all, (1) no I hadn't considered just naming the methods differently. I actually really like this idea and am for it. Except we need 3 different methods now. One for no processor, one for a processor that should restore and one that reprocesses. How about `addCustomGlobalStore` and `addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no processor case? If everyone likes that I can add that to the KIP and rename the methods. (2) we can have the the built in case use StoreBuilder and manually check for the TimestampedKeyValueStore. That is fine with me. Bruno I hope that was what you were intending. (3) For the scala api, do we need to make it match the java api or are we just making the minimum changes? as if we take point 1 I don't know how much we need to change. Thanks, Walker On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax wrote: One more thing: I was just looking into the WIP PR, and it seems we will also need to change `StreamsBuilder.scala`. The KIP needs to cover this changes as well. -Matthias On 4/1/24 10:33 PM, Bruno Cadonna wrote: Hi Walker and Matthias, (2) That is exactly my point about having a compile time error versus a runtime error. The added flexibility as proposed by Matthias sounds good to me. Regarding the Named parameter, I was not aware that the processor that writes records to the global state store is named according to the name passed in by Consumed. I thought Consumed strictly specifies the names of source processors. So I am fine with not having an overload with a Named parameter. Best, Bruno On 3/31/24 11:30 AM, Matthias J. Sax wrote: Two more follow up thoughts: (1) I am still not a big fan of the boolean parameter we introduce. Did you consider to use different method names, like `addReadOnlyGlobalStore()` (for the optimized method, that would not reprocess data on restore), and maybe add `addModifiableGlobalStore()` (not a good name, but we cannot re-use existing `addGlobalStore()` -- maybe somebody else has a good idea about a better `addXxxGlobalStore` that would describe it well). (2) I was thinking about Bruno's comment to limit the scope the store builder for the optimized case. I think we should actually do something about it, because in the end, the runtime (ie, the `Processor` we hard wire) would need to pick a store it supports and cast to the corresponding store? If the cast fails, we hit a runtime exception, but by putting the store we cast to into the signature we can actually convert it into a compile time error what seems better. -- If we want, we could make it somewhat flexible and support both `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature would be `KeyValueStore` but we explicitly check if the builder gives us a `TimestampedKeyValueStore` instance and use it properly. If putting the signature does not work for some reason, we should at least clearly call it out in the JavaDocs what store type is expected. -Matthias On 3/28/24 5:05 PM, Walker Carlson wrote: Hey all, Thanks for the feedback Bruno, Almog and Matthias! Almog: I like the idea, but I agree with Matthais. I actually looked at that ticket a bit when doing this and found that while similar they are actually pretty unrelated codewise. I would love to see it get taken care of. Bruno and Matthias: The Named parameter doesn't really make sense to me to put it here. The store in the Store builder is already named through what Matthais described and the processor doesn't actually have a name. That would be the processor node that gets named via the Named parameter (in the DSL) and
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hey all, (1) no I hadn't considered just naming the methods differently. I actually really like this idea and am for it. Except we need 3 different methods now. One for no processor, one for a processor that should restore and one that reprocesses. How about `addCustomGlobalStore` and `addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no processor case? If everyone likes that I can add that to the KIP and rename the methods. (2) we can have the the built in case use StoreBuilder and manually check for the TimestampedKeyValueStore. That is fine with me. Bruno I hope that was what you were intending. (3) For the scala api, do we need to make it match the java api or are we just making the minimum changes? as if we take point 1 I don't know how much we need to change. Thanks, Walker On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax wrote: > One more thing: > > I was just looking into the WIP PR, and it seems we will also need to > change `StreamsBuilder.scala`. The KIP needs to cover this changes as well. > > > -Matthias > > On 4/1/24 10:33 PM, Bruno Cadonna wrote: > > Hi Walker and Matthias, > > > > (2) > > That is exactly my point about having a compile time error versus a > > runtime error. The added flexibility as proposed by Matthias sounds good > > to me. > > > > Regarding the Named parameter, I was not aware that the processor that > > writes records to the global state store is named according to the name > > passed in by Consumed. I thought Consumed strictly specifies the names > > of source processors. So I am fine with not having an overload with a > > Named parameter. > > > > Best, > > Bruno > > > > On 3/31/24 11:30 AM, Matthias J. Sax wrote: > >> Two more follow up thoughts: > >> > >> (1) I am still not a big fan of the boolean parameter we introduce. > >> Did you consider to use different method names, like > >> `addReadOnlyGlobalStore()` (for the optimized method, that would not > >> reprocess data on restore), and maybe add `addModifiableGlobalStore()` > >> (not a good name, but we cannot re-use existing `addGlobalStore()` -- > >> maybe somebody else has a good idea about a better `addXxxGlobalStore` > >> that would describe it well). > >> > >> (2) I was thinking about Bruno's comment to limit the scope the store > >> builder for the optimized case. I think we should actually do > >> something about it, because in the end, the runtime (ie, the > >> `Processor` we hard wire) would need to pick a store it supports and > >> cast to the corresponding store? If the cast fails, we hit a runtime > >> exception, but by putting the store we cast to into the signature we > >> can actually convert it into a compile time error what seems better. > >> -- If we want, we could make it somewhat flexible and support both > >> `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature > >> would be `KeyValueStore` but we explicitly check if the builder gives > >> us a `TimestampedKeyValueStore` instance and use it properly. > >> > >> If putting the signature does not work for some reason, we should at > >> least clearly call it out in the JavaDocs what store type is expected. > >> > >> > >> > >> -Matthias > >> > >> > >> > >> On 3/28/24 5:05 PM, Walker Carlson wrote: > >>> Hey all, > >>> > >>> Thanks for the feedback Bruno, Almog and Matthias! > >>> > >>> Almog: I like the idea, but I agree with Matthais. I actually looked at > >>> that ticket a bit when doing this and found that while similar they are > >>> actually pretty unrelated codewise. I would love to see it get taken > >>> care > >>> of. > >>> > >>> Bruno and Matthias: The Named parameter doesn't really make sense to > >>> me to > >>> put it here. The store in the Store builder is already named through > >>> what > >>> Matthais described and the processor doesn't actually have a name. That > >>> would be the processor node that gets named via the Named parameter > (in > >>> the DSL) and the internal streams builder uses the consumed object to > >>> make > >>> a source name. I think we should just keep the Consumed object and used > >>> that for the processor node name. > >>> > >>> As for the limitation of the store builder interface I don't think it > is > >>> necessary. It could be something we add later if we really want to. > >>> > >>> Anyways I think we are getting close enough to consensus that I'm > >>> going to > >>> open a vote and hopefully we can get it voted on soon! > >>> > >>> best, > >>> Walker > >>> > >>> On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax > >>> wrote: > >>> > Hey, > > looking into the API, I am wondering why we would need to add an > overload talking a `Named` parameter? > > StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes > a > `Consumed` parameter that allows to set a name. > > > > 2. > > I do not understand what you mean with "maximum flexibility". The > built-in processor needs to assume a given state store
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
One more thing: I was just looking into the WIP PR, and it seems we will also need to change `StreamsBuilder.scala`. The KIP needs to cover this changes as well. -Matthias On 4/1/24 10:33 PM, Bruno Cadonna wrote: Hi Walker and Matthias, (2) That is exactly my point about having a compile time error versus a runtime error. The added flexibility as proposed by Matthias sounds good to me. Regarding the Named parameter, I was not aware that the processor that writes records to the global state store is named according to the name passed in by Consumed. I thought Consumed strictly specifies the names of source processors. So I am fine with not having an overload with a Named parameter. Best, Bruno On 3/31/24 11:30 AM, Matthias J. Sax wrote: Two more follow up thoughts: (1) I am still not a big fan of the boolean parameter we introduce. Did you consider to use different method names, like `addReadOnlyGlobalStore()` (for the optimized method, that would not reprocess data on restore), and maybe add `addModifiableGlobalStore()` (not a good name, but we cannot re-use existing `addGlobalStore()` -- maybe somebody else has a good idea about a better `addXxxGlobalStore` that would describe it well). (2) I was thinking about Bruno's comment to limit the scope the store builder for the optimized case. I think we should actually do something about it, because in the end, the runtime (ie, the `Processor` we hard wire) would need to pick a store it supports and cast to the corresponding store? If the cast fails, we hit a runtime exception, but by putting the store we cast to into the signature we can actually convert it into a compile time error what seems better. -- If we want, we could make it somewhat flexible and support both `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature would be `KeyValueStore` but we explicitly check if the builder gives us a `TimestampedKeyValueStore` instance and use it properly. If putting the signature does not work for some reason, we should at least clearly call it out in the JavaDocs what store type is expected. -Matthias On 3/28/24 5:05 PM, Walker Carlson wrote: Hey all, Thanks for the feedback Bruno, Almog and Matthias! Almog: I like the idea, but I agree with Matthais. I actually looked at that ticket a bit when doing this and found that while similar they are actually pretty unrelated codewise. I would love to see it get taken care of. Bruno and Matthias: The Named parameter doesn't really make sense to me to put it here. The store in the Store builder is already named through what Matthais described and the processor doesn't actually have a name. That would be the processor node that gets named via the Named parameter (in the DSL) and the internal streams builder uses the consumed object to make a source name. I think we should just keep the Consumed object and used that for the processor node name. As for the limitation of the store builder interface I don't think it is necessary. It could be something we add later if we really want to. Anyways I think we are getting close enough to consensus that I'm going to open a vote and hopefully we can get it voted on soon! best, Walker On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax wrote: Hey, looking into the API, I am wondering why we would need to add an overload talking a `Named` parameter? StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a `Consumed` parameter that allows to set a name. 2. I do not understand what you mean with "maximum flexibility". The built-in processor needs to assume a given state store interface. That means, users have to provide a state store that offers that interface. If they do not they will get a runtime exception. If we require a store builder for a given interface, we can catch the mistake at compile time. Let me know whether I misunderstood something. Yes, we could catch it at runtime. But I guess what I was trying to say is different: I was trying to say, we should not limit the API to always require a specific store, such that global stores can only be of a certain type. Global Stores should be allowed to be of any type. Hence, if we add a built-in processor, it can only be one option, and we always need to support custom processor, and might also want to try to allow the restore optimization for custom processor (and thus other store types), not just for our built-in processor (and our built-in stores). Coupling the optimization to built-in stores would prevent us to apply the optimization to custom stores. @Almog: interesting idea. I tend to think that both issues are orthogonal. If users pick to apply the optimization "added" by this KIP, the bug you mentioned would still apply to global stores, and thus this KIP is not addressing the issue you mentioned. Personally, I also think that we don't need a KIP to fix the ticket you mentioned? In the end, we need to skip record
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hi Walker and Matthias, (2) That is exactly my point about having a compile time error versus a runtime error. The added flexibility as proposed by Matthias sounds good to me. Regarding the Named parameter, I was not aware that the processor that writes records to the global state store is named according to the name passed in by Consumed. I thought Consumed strictly specifies the names of source processors. So I am fine with not having an overload with a Named parameter. Best, Bruno On 3/31/24 11:30 AM, Matthias J. Sax wrote: Two more follow up thoughts: (1) I am still not a big fan of the boolean parameter we introduce. Did you consider to use different method names, like `addReadOnlyGlobalStore()` (for the optimized method, that would not reprocess data on restore), and maybe add `addModifiableGlobalStore()` (not a good name, but we cannot re-use existing `addGlobalStore()` -- maybe somebody else has a good idea about a better `addXxxGlobalStore` that would describe it well). (2) I was thinking about Bruno's comment to limit the scope the store builder for the optimized case. I think we should actually do something about it, because in the end, the runtime (ie, the `Processor` we hard wire) would need to pick a store it supports and cast to the corresponding store? If the cast fails, we hit a runtime exception, but by putting the store we cast to into the signature we can actually convert it into a compile time error what seems better. -- If we want, we could make it somewhat flexible and support both `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature would be `KeyValueStore` but we explicitly check if the builder gives us a `TimestampedKeyValueStore` instance and use it properly. If putting the signature does not work for some reason, we should at least clearly call it out in the JavaDocs what store type is expected. -Matthias On 3/28/24 5:05 PM, Walker Carlson wrote: Hey all, Thanks for the feedback Bruno, Almog and Matthias! Almog: I like the idea, but I agree with Matthais. I actually looked at that ticket a bit when doing this and found that while similar they are actually pretty unrelated codewise. I would love to see it get taken care of. Bruno and Matthias: The Named parameter doesn't really make sense to me to put it here. The store in the Store builder is already named through what Matthais described and the processor doesn't actually have a name. That would be the processor node that gets named via the Named parameter (in the DSL) and the internal streams builder uses the consumed object to make a source name. I think we should just keep the Consumed object and used that for the processor node name. As for the limitation of the store builder interface I don't think it is necessary. It could be something we add later if we really want to. Anyways I think we are getting close enough to consensus that I'm going to open a vote and hopefully we can get it voted on soon! best, Walker On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax wrote: Hey, looking into the API, I am wondering why we would need to add an overload talking a `Named` parameter? StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a `Consumed` parameter that allows to set a name. 2. I do not understand what you mean with "maximum flexibility". The built-in processor needs to assume a given state store interface. That means, users have to provide a state store that offers that interface. If they do not they will get a runtime exception. If we require a store builder for a given interface, we can catch the mistake at compile time. Let me know whether I misunderstood something. Yes, we could catch it at runtime. But I guess what I was trying to say is different: I was trying to say, we should not limit the API to always require a specific store, such that global stores can only be of a certain type. Global Stores should be allowed to be of any type. Hence, if we add a built-in processor, it can only be one option, and we always need to support custom processor, and might also want to try to allow the restore optimization for custom processor (and thus other store types), not just for our built-in processor (and our built-in stores). Coupling the optimization to built-in stores would prevent us to apply the optimization to custom stores. @Almog: interesting idea. I tend to think that both issues are orthogonal. If users pick to apply the optimization "added" by this KIP, the bug you mentioned would still apply to global stores, and thus this KIP is not addressing the issue you mentioned. Personally, I also think that we don't need a KIP to fix the ticket you mentioned? In the end, we need to skip records during restore, and it seems it does not make sense to make this configurable? -Matthias On 3/26/24 4:24 PM, Almog Gavra wrote: Thanks for the thoughts Bruno! Do you mean a API to configure restoration instead of boolean fl
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Two more follow up thoughts: (1) I am still not a big fan of the boolean parameter we introduce. Did you consider to use different method names, like `addReadOnlyGlobalStore()` (for the optimized method, that would not reprocess data on restore), and maybe add `addModifiableGlobalStore()` (not a good name, but we cannot re-use existing `addGlobalStore()` -- maybe somebody else has a good idea about a better `addXxxGlobalStore` that would describe it well). (2) I was thinking about Bruno's comment to limit the scope the store builder for the optimized case. I think we should actually do something about it, because in the end, the runtime (ie, the `Processor` we hard wire) would need to pick a store it supports and cast to the corresponding store? If the cast fails, we hit a runtime exception, but by putting the store we cast to into the signature we can actually convert it into a compile time error what seems better. -- If we want, we could make it somewhat flexible and support both `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature would be `KeyValueStore` but we explicitly check if the builder gives us a `TimestampedKeyValueStore` instance and use it properly. If putting the signature does not work for some reason, we should at least clearly call it out in the JavaDocs what store type is expected. -Matthias On 3/28/24 5:05 PM, Walker Carlson wrote: Hey all, Thanks for the feedback Bruno, Almog and Matthias! Almog: I like the idea, but I agree with Matthais. I actually looked at that ticket a bit when doing this and found that while similar they are actually pretty unrelated codewise. I would love to see it get taken care of. Bruno and Matthias: The Named parameter doesn't really make sense to me to put it here. The store in the Store builder is already named through what Matthais described and the processor doesn't actually have a name. That would be the processor node that gets named via the Named parameter (in the DSL) and the internal streams builder uses the consumed object to make a source name. I think we should just keep the Consumed object and used that for the processor node name. As for the limitation of the store builder interface I don't think it is necessary. It could be something we add later if we really want to. Anyways I think we are getting close enough to consensus that I'm going to open a vote and hopefully we can get it voted on soon! best, Walker On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax wrote: Hey, looking into the API, I am wondering why we would need to add an overload talking a `Named` parameter? StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a `Consumed` parameter that allows to set a name. 2. I do not understand what you mean with "maximum flexibility". The built-in processor needs to assume a given state store interface. That means, users have to provide a state store that offers that interface. If they do not they will get a runtime exception. If we require a store builder for a given interface, we can catch the mistake at compile time. Let me know whether I misunderstood something. Yes, we could catch it at runtime. But I guess what I was trying to say is different: I was trying to say, we should not limit the API to always require a specific store, such that global stores can only be of a certain type. Global Stores should be allowed to be of any type. Hence, if we add a built-in processor, it can only be one option, and we always need to support custom processor, and might also want to try to allow the restore optimization for custom processor (and thus other store types), not just for our built-in processor (and our built-in stores). Coupling the optimization to built-in stores would prevent us to apply the optimization to custom stores. @Almog: interesting idea. I tend to think that both issues are orthogonal. If users pick to apply the optimization "added" by this KIP, the bug you mentioned would still apply to global stores, and thus this KIP is not addressing the issue you mentioned. Personally, I also think that we don't need a KIP to fix the ticket you mentioned? In the end, we need to skip records during restore, and it seems it does not make sense to make this configurable? -Matthias On 3/26/24 4:24 PM, Almog Gavra wrote: Thanks for the thoughts Bruno! Do you mean a API to configure restoration instead of boolean flag reprocessOnRestore? Yes, this is exactly the type of thing I was musing (but I don't have any concrete suggestions). It feels like that would give the flexibility to do things like the motivation section of the KIP (allow bulk loading of records without reprocessing) while also solving other limitations. I'm supportive of the KIP as-is but was hoping somebody with more experience would have a sudden inspiration for how to solve both issues with one API! Anyway, I'll slide back into the lurking shadows for now and let the discussion contin
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hey all, Thanks for the feedback Bruno, Almog and Matthias! Almog: I like the idea, but I agree with Matthais. I actually looked at that ticket a bit when doing this and found that while similar they are actually pretty unrelated codewise. I would love to see it get taken care of. Bruno and Matthias: The Named parameter doesn't really make sense to me to put it here. The store in the Store builder is already named through what Matthais described and the processor doesn't actually have a name. That would be the processor node that gets named via the Named parameter (in the DSL) and the internal streams builder uses the consumed object to make a source name. I think we should just keep the Consumed object and used that for the processor node name. As for the limitation of the store builder interface I don't think it is necessary. It could be something we add later if we really want to. Anyways I think we are getting close enough to consensus that I'm going to open a vote and hopefully we can get it voted on soon! best, Walker On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax wrote: > Hey, > > looking into the API, I am wondering why we would need to add an > overload talking a `Named` parameter? > > StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a > `Consumed` parameter that allows to set a name. > > > > 2. > > I do not understand what you mean with "maximum flexibility". The > built-in processor needs to assume a given state store interface. That > means, users have to provide a state store that offers that interface. If > they do not they will get a runtime exception. If we require a store > builder for a given interface, we can catch the mistake at compile time. > Let me know whether I misunderstood something. > > Yes, we could catch it at runtime. But I guess what I was trying to say > is different: I was trying to say, we should not limit the API to always > require a specific store, such that global stores can only be of a > certain type. Global Stores should be allowed to be of any type. Hence, > if we add a built-in processor, it can only be one option, and we always > need to support custom processor, and might also want to try to allow > the restore optimization for custom processor (and thus other store > types), not just for our built-in processor (and our built-in stores). > Coupling the optimization to built-in stores would prevent us to apply > the optimization to custom stores. > > > > @Almog: interesting idea. I tend to think that both issues are > orthogonal. If users pick to apply the optimization "added" by this KIP, > the bug you mentioned would still apply to global stores, and thus this > KIP is not addressing the issue you mentioned. > > Personally, I also think that we don't need a KIP to fix the ticket you > mentioned? In the end, we need to skip records during restore, and it > seems it does not make sense to make this configurable? > > > > -Matthias > > > On 3/26/24 4:24 PM, Almog Gavra wrote: > > Thanks for the thoughts Bruno! > > > >> Do you mean a API to configure restoration instead of boolean flag > > reprocessOnRestore? > > > > Yes, this is exactly the type of thing I was musing (but I don't have any > > concrete suggestions). It feels like that would give the flexibility to > do > > things like the motivation section of the KIP (allow bulk loading of > > records without reprocessing) while also solving other limitations. > > > > I'm supportive of the KIP as-is but was hoping somebody with more > > experience would have a sudden inspiration for how to solve both issues > > with one API! Anyway, I'll slide back into the lurking shadows for now > and > > let the discussion continue :) > > > > Cheers, > > Almog > > > > On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna > wrote: > > > >> Hi Almog, > >> > >> Do you mean a API to configure restoration instead of boolean flag > >> reprocessOnRestore? > >> > >> Do you already have an idea? > >> > >> The proposal in the KIP is focused on the processor that updates the > >> global state whereas in the case of GlobalKTable and source KTable the > >> issues lies in the deserialization of records from the input topics, but > >> only if the deserialization error handler is configured to drop the > >> problematic record. Additionally, for source KTable the source topic > >> optimization must be turned on to run into the issue. I am wondering how > >> a unified API for global stores, GlobalKTable, and source KTable might > >> look like. > >> > >> While it is an interesting question, I am in favor of deferring this to > >> a separate KIP. > >> > >> Best, > >> Bruno > >> > >> On 3/26/24 12:49 AM, Almog Gavra wrote: > >>> Hello Folk! > >>> > >>> Glad to see improvements to the GlobalKTables in discussion! I think > they > >>> deserve more love :) > >>> > >>> Scope creep alert (which I'm generally against and certainly still > >> support > >>> this KIP without but I want to see if there's an elegant way to address > >
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hey, looking into the API, I am wondering why we would need to add an overload talking a `Named` parameter? StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a `Consumed` parameter that allows to set a name. 2. I do not understand what you mean with "maximum flexibility". The built-in processor needs to assume a given state store interface. That means, users have to provide a state store that offers that interface. If they do not they will get a runtime exception. If we require a store builder for a given interface, we can catch the mistake at compile time. Let me know whether I misunderstood something. Yes, we could catch it at runtime. But I guess what I was trying to say is different: I was trying to say, we should not limit the API to always require a specific store, such that global stores can only be of a certain type. Global Stores should be allowed to be of any type. Hence, if we add a built-in processor, it can only be one option, and we always need to support custom processor, and might also want to try to allow the restore optimization for custom processor (and thus other store types), not just for our built-in processor (and our built-in stores). Coupling the optimization to built-in stores would prevent us to apply the optimization to custom stores. @Almog: interesting idea. I tend to think that both issues are orthogonal. If users pick to apply the optimization "added" by this KIP, the bug you mentioned would still apply to global stores, and thus this KIP is not addressing the issue you mentioned. Personally, I also think that we don't need a KIP to fix the ticket you mentioned? In the end, we need to skip records during restore, and it seems it does not make sense to make this configurable? -Matthias On 3/26/24 4:24 PM, Almog Gavra wrote: Thanks for the thoughts Bruno! Do you mean a API to configure restoration instead of boolean flag reprocessOnRestore? Yes, this is exactly the type of thing I was musing (but I don't have any concrete suggestions). It feels like that would give the flexibility to do things like the motivation section of the KIP (allow bulk loading of records without reprocessing) while also solving other limitations. I'm supportive of the KIP as-is but was hoping somebody with more experience would have a sudden inspiration for how to solve both issues with one API! Anyway, I'll slide back into the lurking shadows for now and let the discussion continue :) Cheers, Almog On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna wrote: Hi Almog, Do you mean a API to configure restoration instead of boolean flag reprocessOnRestore? Do you already have an idea? The proposal in the KIP is focused on the processor that updates the global state whereas in the case of GlobalKTable and source KTable the issues lies in the deserialization of records from the input topics, but only if the deserialization error handler is configured to drop the problematic record. Additionally, for source KTable the source topic optimization must be turned on to run into the issue. I am wondering how a unified API for global stores, GlobalKTable, and source KTable might look like. While it is an interesting question, I am in favor of deferring this to a separate KIP. Best, Bruno On 3/26/24 12:49 AM, Almog Gavra wrote: Hello Folk! Glad to see improvements to the GlobalKTables in discussion! I think they deserve more love :) Scope creep alert (which I'm generally against and certainly still support this KIP without but I want to see if there's an elegant way to address both problems). The KIP mentions that "Now the restore is done by reprocessing using an instance from the customer processor supplier" which I suppose fixed a long-standing bug ( https://issues.apache.org/jira/browse/KAFKA-8037) but only for GlobalKTables and not for normal KTables that use the source-changelog optimization. Since this API could be used to signal "I want to reprocess on restore" I'm wondering whether it makes sense to design this API in a way that could be extended for KTables as well so a fix for KAFKA-8037 would be possible with the same mechanism. Thoughts? Cheers, Almog On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson wrote: Hey Bruno, 1) I'm actually not sure why that is in there. It certainly doesn't match the convention. Best to remove it and match the other methods. 2) Yeah, I thought about it but I'm not convinced it is a necessary restriction. It might be useful for the already defined processors but then they might as well use the `globalTable` method. I think the add state store option should go for maximum flexibility. Best, Walker On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna wrote: Hi Walker, A couple of follow-up questions. 1. Why do you propose to explicitly pass a parameter "storeName" in StreamsBuilder#addGlobalStore? The StoreBuilder should already provide a name for the store, if I understand the code correctl
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Thanks for the thoughts Bruno! > Do you mean a API to configure restoration instead of boolean flag reprocessOnRestore? Yes, this is exactly the type of thing I was musing (but I don't have any concrete suggestions). It feels like that would give the flexibility to do things like the motivation section of the KIP (allow bulk loading of records without reprocessing) while also solving other limitations. I'm supportive of the KIP as-is but was hoping somebody with more experience would have a sudden inspiration for how to solve both issues with one API! Anyway, I'll slide back into the lurking shadows for now and let the discussion continue :) Cheers, Almog On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna wrote: > Hi Almog, > > Do you mean a API to configure restoration instead of boolean flag > reprocessOnRestore? > > Do you already have an idea? > > The proposal in the KIP is focused on the processor that updates the > global state whereas in the case of GlobalKTable and source KTable the > issues lies in the deserialization of records from the input topics, but > only if the deserialization error handler is configured to drop the > problematic record. Additionally, for source KTable the source topic > optimization must be turned on to run into the issue. I am wondering how > a unified API for global stores, GlobalKTable, and source KTable might > look like. > > While it is an interesting question, I am in favor of deferring this to > a separate KIP. > > Best, > Bruno > > On 3/26/24 12:49 AM, Almog Gavra wrote: > > Hello Folk! > > > > Glad to see improvements to the GlobalKTables in discussion! I think they > > deserve more love :) > > > > Scope creep alert (which I'm generally against and certainly still > support > > this KIP without but I want to see if there's an elegant way to address > > both problems). The KIP mentions that "Now the restore is done by > > reprocessing using an instance from the customer processor supplier" > which > > I suppose fixed a long-standing bug ( > > https://issues.apache.org/jira/browse/KAFKA-8037) but only for > > GlobalKTables and not for normal KTables that use the source-changelog > > optimization. Since this API could be used to signal "I want to reprocess > > on restore" I'm wondering whether it makes sense to design this API in a > > way that could be extended for KTables as well so a fix for KAFKA-8037 > > would be possible with the same mechanism. Thoughts? > > > > Cheers, > > Almog > > > > On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson > > wrote: > > > >> Hey Bruno, > >> > >> 1) I'm actually not sure why that is in there. It certainly doesn't > match > >> the convention. Best to remove it and match the other methods. > >> > >> 2) Yeah, I thought about it but I'm not convinced it is a necessary > >> restriction. It might be useful for the already defined processors but > then > >> they might as well use the `globalTable` method. I think the add state > >> store option should go for maximum flexibility. > >> > >> Best, > >> Walker > >> > >> > >> > >> On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna > wrote: > >> > >>> Hi Walker, > >>> > >>> A couple of follow-up questions. > >>> > >>> 1. > >>> Why do you propose to explicitly pass a parameter "storeName" in > >>> StreamsBuilder#addGlobalStore? > >>> The StoreBuilder should already provide a name for the store, if I > >>> understand the code correctly. > >>> I would avoid using the same name for the source node and the state > >>> store, because it limits the flexibility in naming. Why do you not use > >>> Named for the name of the source node? > >>> > >>> 2. > >>> Did you consider Matthias' proposal to restrict the type of the store > >>> builder to `StoreBuilder` (or even > >>> `StoreBuilder`) for the case where > >>> the processor is built-in? > >>> > >>> > >>> Best, > >>> Bruno > >>> > >>> On 3/13/24 11:05 PM, Walker Carlson wrote: > Thanks for the feedback Bruno, Matthias, and Lucas! > > There is a decent amount but I'm going to try and just hit the major > >>> points > as I would like to keep this change simple. > > I've made corrections for the mistakes pointed out. Thanks for the > suggestions everyone. > > The main sticking point seems to be with the method of signalling the > restore behavior. It seems we can all agree with how the API should > >> look > with the default option we are adding. I think keeping the option to > >> load > directly from the topic into the store is a good idea. It is much more > performant and could make a simple metric collector processor much > >>> simpler. > > I think something that Matthais said about creating a special class of > processors for the global stores helps me think about the issue. I > tend > >>> to > fall into the category that we should keep global stores open to the > possibility of having child nodes in the future. I don't really see > the > downside of having
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hi Almog, Do you mean a API to configure restoration instead of boolean flag reprocessOnRestore? Do you already have an idea? The proposal in the KIP is focused on the processor that updates the global state whereas in the case of GlobalKTable and source KTable the issues lies in the deserialization of records from the input topics, but only if the deserialization error handler is configured to drop the problematic record. Additionally, for source KTable the source topic optimization must be turned on to run into the issue. I am wondering how a unified API for global stores, GlobalKTable, and source KTable might look like. While it is an interesting question, I am in favor of deferring this to a separate KIP. Best, Bruno On 3/26/24 12:49 AM, Almog Gavra wrote: Hello Folk! Glad to see improvements to the GlobalKTables in discussion! I think they deserve more love :) Scope creep alert (which I'm generally against and certainly still support this KIP without but I want to see if there's an elegant way to address both problems). The KIP mentions that "Now the restore is done by reprocessing using an instance from the customer processor supplier" which I suppose fixed a long-standing bug ( https://issues.apache.org/jira/browse/KAFKA-8037) but only for GlobalKTables and not for normal KTables that use the source-changelog optimization. Since this API could be used to signal "I want to reprocess on restore" I'm wondering whether it makes sense to design this API in a way that could be extended for KTables as well so a fix for KAFKA-8037 would be possible with the same mechanism. Thoughts? Cheers, Almog On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson wrote: Hey Bruno, 1) I'm actually not sure why that is in there. It certainly doesn't match the convention. Best to remove it and match the other methods. 2) Yeah, I thought about it but I'm not convinced it is a necessary restriction. It might be useful for the already defined processors but then they might as well use the `globalTable` method. I think the add state store option should go for maximum flexibility. Best, Walker On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna wrote: Hi Walker, A couple of follow-up questions. 1. Why do you propose to explicitly pass a parameter "storeName" in StreamsBuilder#addGlobalStore? The StoreBuilder should already provide a name for the store, if I understand the code correctly. I would avoid using the same name for the source node and the state store, because it limits the flexibility in naming. Why do you not use Named for the name of the source node? 2. Did you consider Matthias' proposal to restrict the type of the store builder to `StoreBuilder` (or even `StoreBuilder`) for the case where the processor is built-in? Best, Bruno On 3/13/24 11:05 PM, Walker Carlson wrote: Thanks for the feedback Bruno, Matthias, and Lucas! There is a decent amount but I'm going to try and just hit the major points as I would like to keep this change simple. I've made corrections for the mistakes pointed out. Thanks for the suggestions everyone. The main sticking point seems to be with the method of signalling the restore behavior. It seems we can all agree with how the API should look with the default option we are adding. I think keeping the option to load directly from the topic into the store is a good idea. It is much more performant and could make a simple metric collector processor much simpler. I think something that Matthais said about creating a special class of processors for the global stores helps me think about the issue. I tend to fall into the category that we should keep global stores open to the possibility of having child nodes in the future. I don't really see the downside of having that as an option. It might not be best for a lot of cases, but something simple could be very useful to put in the PAPI. I like the idea of having a `GlobalStoreParameters` but only if we decide to make the processor need to extend an interface like 'GobalStoreProcessor`. If not that seems excessive. As of right now I don't see a better option than having a boolean flag for the reprocessOnRestore option. I expanded the description in the docs so I hope that helps. I am more than willing to take other ideas on it. thanks, Walker
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hi Walker, I have follow-up comments. 1. I think, we should add an overload to the StreamsBuilder class that allows to name the processor with Named. That makes that processor consistent with all other processors in the DSL regarding naming. 2. I do not understand what you mean with "maximum flexibility". The built-in processor needs to assume a given state store interface. That means, users have to provide a state store that offers that interface. If they do not they will get a runtime exception. If we require a store builder for a given interface, we can catch the mistake at compile time. Let me know whether I misunderstood something. Best, Bruno On 3/25/24 7:05 PM, Walker Carlson wrote: Hey Bruno, 1) I'm actually not sure why that is in there. It certainly doesn't match the convention. Best to remove it and match the other methods. 2) Yeah, I thought about it but I'm not convinced it is a necessary restriction. It might be useful for the already defined processors but then they might as well use the `globalTable` method. I think the add state store option should go for maximum flexibility. Best, Walker On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna wrote: Hi Walker, A couple of follow-up questions. 1. Why do you propose to explicitly pass a parameter "storeName" in StreamsBuilder#addGlobalStore? The StoreBuilder should already provide a name for the store, if I understand the code correctly. I would avoid using the same name for the source node and the state store, because it limits the flexibility in naming. Why do you not use Named for the name of the source node? 2. Did you consider Matthias' proposal to restrict the type of the store builder to `StoreBuilder` (or even `StoreBuilder`) for the case where the processor is built-in? Best, Bruno On 3/13/24 11:05 PM, Walker Carlson wrote: Thanks for the feedback Bruno, Matthias, and Lucas! There is a decent amount but I'm going to try and just hit the major points as I would like to keep this change simple. I've made corrections for the mistakes pointed out. Thanks for the suggestions everyone. The main sticking point seems to be with the method of signalling the restore behavior. It seems we can all agree with how the API should look with the default option we are adding. I think keeping the option to load directly from the topic into the store is a good idea. It is much more performant and could make a simple metric collector processor much simpler. I think something that Matthais said about creating a special class of processors for the global stores helps me think about the issue. I tend to fall into the category that we should keep global stores open to the possibility of having child nodes in the future. I don't really see the downside of having that as an option. It might not be best for a lot of cases, but something simple could be very useful to put in the PAPI. I like the idea of having a `GlobalStoreParameters` but only if we decide to make the processor need to extend an interface like 'GobalStoreProcessor`. If not that seems excessive. As of right now I don't see a better option than having a boolean flag for the reprocessOnRestore option. I expanded the description in the docs so I hope that helps. I am more than willing to take other ideas on it. thanks, Walker
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hello Folk! Glad to see improvements to the GlobalKTables in discussion! I think they deserve more love :) Scope creep alert (which I'm generally against and certainly still support this KIP without but I want to see if there's an elegant way to address both problems). The KIP mentions that "Now the restore is done by reprocessing using an instance from the customer processor supplier" which I suppose fixed a long-standing bug ( https://issues.apache.org/jira/browse/KAFKA-8037) but only for GlobalKTables and not for normal KTables that use the source-changelog optimization. Since this API could be used to signal "I want to reprocess on restore" I'm wondering whether it makes sense to design this API in a way that could be extended for KTables as well so a fix for KAFKA-8037 would be possible with the same mechanism. Thoughts? Cheers, Almog On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson wrote: > Hey Bruno, > > 1) I'm actually not sure why that is in there. It certainly doesn't match > the convention. Best to remove it and match the other methods. > > 2) Yeah, I thought about it but I'm not convinced it is a necessary > restriction. It might be useful for the already defined processors but then > they might as well use the `globalTable` method. I think the add state > store option should go for maximum flexibility. > > Best, > Walker > > > > On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna wrote: > > > Hi Walker, > > > > A couple of follow-up questions. > > > > 1. > > Why do you propose to explicitly pass a parameter "storeName" in > > StreamsBuilder#addGlobalStore? > > The StoreBuilder should already provide a name for the store, if I > > understand the code correctly. > > I would avoid using the same name for the source node and the state > > store, because it limits the flexibility in naming. Why do you not use > > Named for the name of the source node? > > > > 2. > > Did you consider Matthias' proposal to restrict the type of the store > > builder to `StoreBuilder` (or even > > `StoreBuilder`) for the case where > > the processor is built-in? > > > > > > Best, > > Bruno > > > > On 3/13/24 11:05 PM, Walker Carlson wrote: > > > Thanks for the feedback Bruno, Matthias, and Lucas! > > > > > > There is a decent amount but I'm going to try and just hit the major > > points > > > as I would like to keep this change simple. > > > > > > I've made corrections for the mistakes pointed out. Thanks for the > > > suggestions everyone. > > > > > > The main sticking point seems to be with the method of signalling the > > > restore behavior. It seems we can all agree with how the API should > look > > > with the default option we are adding. I think keeping the option to > load > > > directly from the topic into the store is a good idea. It is much more > > > performant and could make a simple metric collector processor much > > simpler. > > > > > > I think something that Matthais said about creating a special class of > > > processors for the global stores helps me think about the issue. I tend > > to > > > fall into the category that we should keep global stores open to the > > > possibility of having child nodes in the future. I don't really see the > > > downside of having that as an option. It might not be best for a lot of > > > cases, but something simple could be very useful to put in the PAPI. > > > > > > I like the idea of having a `GlobalStoreParameters` but only if we > decide > > > to make the processor need to extend an interface like > > > 'GobalStoreProcessor`. If not that seems excessive. > > > > > > As of right now I don't see a better option than having a boolean flag > > for > > > the reprocessOnRestore option. I expanded the description in the docs > so > > I > > > hope that helps. > > > > > > I am more than willing to take other ideas on it. > > > > > > thanks, > > > Walker > > > > > >
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hey Bruno, 1) I'm actually not sure why that is in there. It certainly doesn't match the convention. Best to remove it and match the other methods. 2) Yeah, I thought about it but I'm not convinced it is a necessary restriction. It might be useful for the already defined processors but then they might as well use the `globalTable` method. I think the add state store option should go for maximum flexibility. Best, Walker On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna wrote: > Hi Walker, > > A couple of follow-up questions. > > 1. > Why do you propose to explicitly pass a parameter "storeName" in > StreamsBuilder#addGlobalStore? > The StoreBuilder should already provide a name for the store, if I > understand the code correctly. > I would avoid using the same name for the source node and the state > store, because it limits the flexibility in naming. Why do you not use > Named for the name of the source node? > > 2. > Did you consider Matthias' proposal to restrict the type of the store > builder to `StoreBuilder` (or even > `StoreBuilder`) for the case where > the processor is built-in? > > > Best, > Bruno > > On 3/13/24 11:05 PM, Walker Carlson wrote: > > Thanks for the feedback Bruno, Matthias, and Lucas! > > > > There is a decent amount but I'm going to try and just hit the major > points > > as I would like to keep this change simple. > > > > I've made corrections for the mistakes pointed out. Thanks for the > > suggestions everyone. > > > > The main sticking point seems to be with the method of signalling the > > restore behavior. It seems we can all agree with how the API should look > > with the default option we are adding. I think keeping the option to load > > directly from the topic into the store is a good idea. It is much more > > performant and could make a simple metric collector processor much > simpler. > > > > I think something that Matthais said about creating a special class of > > processors for the global stores helps me think about the issue. I tend > to > > fall into the category that we should keep global stores open to the > > possibility of having child nodes in the future. I don't really see the > > downside of having that as an option. It might not be best for a lot of > > cases, but something simple could be very useful to put in the PAPI. > > > > I like the idea of having a `GlobalStoreParameters` but only if we decide > > to make the processor need to extend an interface like > > 'GobalStoreProcessor`. If not that seems excessive. > > > > As of right now I don't see a better option than having a boolean flag > for > > the reprocessOnRestore option. I expanded the description in the docs so > I > > hope that helps. > > > > I am more than willing to take other ideas on it. > > > > thanks, > > Walker > > >
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hi Walker, A couple of follow-up questions. 1. Why do you propose to explicitly pass a parameter "storeName" in StreamsBuilder#addGlobalStore? The StoreBuilder should already provide a name for the store, if I understand the code correctly. I would avoid using the same name for the source node and the state store, because it limits the flexibility in naming. Why do you not use Named for the name of the source node? 2. Did you consider Matthias' proposal to restrict the type of the store builder to `StoreBuilder` (or even `StoreBuilder`) for the case where the processor is built-in? Best, Bruno On 3/13/24 11:05 PM, Walker Carlson wrote: Thanks for the feedback Bruno, Matthias, and Lucas! There is a decent amount but I'm going to try and just hit the major points as I would like to keep this change simple. I've made corrections for the mistakes pointed out. Thanks for the suggestions everyone. The main sticking point seems to be with the method of signalling the restore behavior. It seems we can all agree with how the API should look with the default option we are adding. I think keeping the option to load directly from the topic into the store is a good idea. It is much more performant and could make a simple metric collector processor much simpler. I think something that Matthais said about creating a special class of processors for the global stores helps me think about the issue. I tend to fall into the category that we should keep global stores open to the possibility of having child nodes in the future. I don't really see the downside of having that as an option. It might not be best for a lot of cases, but something simple could be very useful to put in the PAPI. I like the idea of having a `GlobalStoreParameters` but only if we decide to make the processor need to extend an interface like 'GobalStoreProcessor`. If not that seems excessive. As of right now I don't see a better option than having a boolean flag for the reprocessOnRestore option. I expanded the description in the docs so I hope that helps. I am more than willing to take other ideas on it. thanks, Walker
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Thanks for the feedback Bruno, Matthias, and Lucas! There is a decent amount but I'm going to try and just hit the major points as I would like to keep this change simple. I've made corrections for the mistakes pointed out. Thanks for the suggestions everyone. The main sticking point seems to be with the method of signalling the restore behavior. It seems we can all agree with how the API should look with the default option we are adding. I think keeping the option to load directly from the topic into the store is a good idea. It is much more performant and could make a simple metric collector processor much simpler. I think something that Matthais said about creating a special class of processors for the global stores helps me think about the issue. I tend to fall into the category that we should keep global stores open to the possibility of having child nodes in the future. I don't really see the downside of having that as an option. It might not be best for a lot of cases, but something simple could be very useful to put in the PAPI. I like the idea of having a `GlobalStoreParameters` but only if we decide to make the processor need to extend an interface like 'GobalStoreProcessor`. If not that seems excessive. As of right now I don't see a better option than having a boolean flag for the reprocessOnRestore option. I expanded the description in the docs so I hope that helps. I am more than willing to take other ideas on it. thanks, Walker
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
If the custom store is a key-value store, yes, we could do this. But the interface does not enforce a key-value store, it's just a most generic `StateStore` that we pass in, and thus it could be something totally unknown to us, and we cannot apply a cast... The underlying idea is really about 100% flexibility in the PAPI layer. That's also the reason why all stores need to provide a callback for the restore path. Kafka Streams runtime can only read the record from the changelog, but it cannot put it into the store, as the runtime only sees the `StateStore` interface -- thus, we invoke a store specific callback (`StateRestoreCallback` interface) that needs to actually put the data into the store for us. For our built-in store, we of course provide these callbacks, but the point is, that the runtime does not know anything about the nature of the store but is fully agnostic to it, to allow the plugin of any custom store with any custom interface (which just needs to implement `StateStore`). Not sure if I understand what you mean by this transformation step? -Matthias On 3/12/24 3:04 AM, Lucas Brutschy wrote: @Matthias: Thanks, I didn't realize that we need processors for any custom store. Are we sure we cannot build a generic processor to load data into a custom key-value store? I'm not sure, but you know the code better than me. One other alternative is to allow the user to provide a state transformer `Function, ConsumerRecord>` to adapt the state before loading it, defaulting to identity. This would provide the ability to do efficient, non-deserializing transformations like => On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax wrote: @Bruno: (1), I think you are spot for the ts-extractor: on the restore code path, we only support record-ts, but there is no need for a custom-ts because for regular changelog topics KS sets the ts, and thus, the optimization this KIP proposes required that the global topic follow the changelog format, ie, the ts must be in the record-ts. However, for the regular processing path, I am not sure if we can omit deserializers. The way the PAPI is wired up, seems to require that we give proper types to _other_ Processor that read from the global state store. For this reason, the store (which takes `Serdes` with proper types) is wrapped with a `MeteredStore` (like all others) to do the Serde work, and this MeteredStore is also exposed to the global-Processor? Might be good for Walker to dig into this to find out the details? If would of course be nice if we could avoid the unnecessary deserialization on topic read, and re-serialization on global-store put for this case, but it seems not to be straightforward to do... (2). Is this about the PAPI/Topology? For this case, we don't have any config object across the board. We only do this in the DSL. Hence, I would propose to just follow the existing pattern in this KIP to keep the API consistent. For the DSL, it could make sense of course. -- Of course, if we think the PAPI could be improved with config objects, we could do this in a dedicate KIP. @Lucas: The PAPI is unfortunately (by design) much more open and less restrictive. If a users has a custom state store, we need some `Processor` code from them, because we cannot provide a built-in processor for an unknown store. The overload which won't take a processor would only work for the built-in key-value store, what I assume would cover most use-cases, however, we should keep the door open for other use cases. Otherwise, we disallow this optimization for custom stores. PAPI is really about flexibility, and yes, with great power comes great responsibility for the users :) But this actually highlights a different aspect: the overload not accepting a custom `Processor` but using a built-in processor, should not accept a generic `StoreBuilder` but should restrict the type to `StoreBuilder`? -Matthias On 3/6/24 1:14 PM, Lucas Brutschy wrote: Hey Walker Thanks for the KIP, and congrats on the KiBiKIP ;) My main point is that I'd vote against introducing `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is just incorrect and should be removed or deprecated. If we think we need to keep the old behavior around, renaming the methods, e.g., to `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old behavior. But at a first glance, the old behavior just looks like a bug to me and should just be removed. So for this KIP, I'd keep two variants as you propose and drop the boolean parameter, but the two variants will be 1) a copy-restore variant without custom processing, as you propose. 2) a process-restore variant with custom processing (parameters the same as before). This should be combined with a clear warning in the Javadoc of the performance downside of this approach. Presentation: 1) I wonder if you could make another pass on the motivation section. I was lacking some context on this problem, and I think the n
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
@Matthias: Thanks, I didn't realize that we need processors for any custom store. Are we sure we cannot build a generic processor to load data into a custom key-value store? I'm not sure, but you know the code better than me. One other alternative is to allow the user to provide a state transformer `Function, ConsumerRecord>` to adapt the state before loading it, defaulting to identity. This would provide the ability to do efficient, non-deserializing transformations like => On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax wrote: > > @Bruno: > > (1), I think you are spot for the ts-extractor: on the restore code > path, we only support record-ts, but there is no need for a custom-ts > because for regular changelog topics KS sets the ts, and thus, the > optimization this KIP proposes required that the global topic follow the > changelog format, ie, the ts must be in the record-ts. > > However, for the regular processing path, I am not sure if we can omit > deserializers. The way the PAPI is wired up, seems to require that we > give proper types to _other_ Processor that read from the global state > store. For this reason, the store (which takes `Serdes` with proper > types) is wrapped with a `MeteredStore` (like all others) to do the > Serde work, and this MeteredStore is also exposed to the > global-Processor? Might be good for Walker to dig into this to find out > the details? > > If would of course be nice if we could avoid the unnecessary > deserialization on topic read, and re-serialization on global-store put > for this case, but it seems not to be straightforward to do... > > > (2). Is this about the PAPI/Topology? For this case, we don't have any > config object across the board. We only do this in the DSL. Hence, I > would propose to just follow the existing pattern in this KIP to keep > the API consistent. For the DSL, it could make sense of course. -- Of > course, if we think the PAPI could be improved with config objects, we > could do this in a dedicate KIP. > > > @Lucas: > > The PAPI is unfortunately (by design) much more open and less > restrictive. If a users has a custom state store, we need some > `Processor` code from them, because we cannot provide a built-in > processor for an unknown store. The overload which won't take a > processor would only work for the built-in key-value store, what I > assume would cover most use-cases, however, we should keep the door open > for other use cases. Otherwise, we disallow this optimization for custom > stores. PAPI is really about flexibility, and yes, with great power > comes great responsibility for the users :) > > But this actually highlights a different aspect: the overload not > accepting a custom `Processor` but using a built-in processor, should > not accept a generic `StoreBuilder` but should restrict the type to > `StoreBuilder`? > > > -Matthias > > > > On 3/6/24 1:14 PM, Lucas Brutschy wrote: > > Hey Walker > > > > Thanks for the KIP, and congrats on the KiBiKIP ;) > > > > My main point is that I'd vote against introducing > > `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is > > just incorrect and should be removed or deprecated. If we think we > > need to keep the old behavior around, renaming the methods, e.g., to > > `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old > > behavior. But at a first glance, the old behavior just looks like a > > bug to me and should just be removed. > > > > So for this KIP, I'd keep two variants as you propose and drop the > > boolean parameter, but the two variants will be > > 1) a copy-restore variant without custom processing, as you propose. > > 2) a process-restore variant with custom processing (parameters the > > same as before). This should be combined with a clear warning in the > > Javadoc of the performance downside of this approach. > > > > Presentation: > > 1) I wonder if you could make another pass on the motivation section. > > I was lacking some context on this problem, and I think the nature of > > the restore issue only became clear to me when I read through the > > comments in the JIRA ticket you linked. > > 2) If we decide to keep the parameter `reprocessOnRestore`, the > > Javadoc on it should be extended. This is a somewhat subtle issue, and > > I don't think `restore by reprocessing` is enough of an explanation. > > > > Nits: > > > > `{@link ValueTransformer ValueTransformer}` -> `{@link > > ValueTransformer ValueTransformers}` > > `user defined` -> `user-defined` > > > > Cheers, > > Lucas > > > > On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna wrote: > >> > >> Hi Walker, > >> > >> Thanks for the KIP! > >> > >> Great that you are going to fix this long-standing issue! > >> > >> 1. > >> I was wondering if we need the timestamp extractor as well as the key > >> and value deserializer in Topology#addGlobalStore() that do not take a > >> ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()? > >> Since those methods setup
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
@Bruno: (1), I think you are spot for the ts-extractor: on the restore code path, we only support record-ts, but there is no need for a custom-ts because for regular changelog topics KS sets the ts, and thus, the optimization this KIP proposes required that the global topic follow the changelog format, ie, the ts must be in the record-ts. However, for the regular processing path, I am not sure if we can omit deserializers. The way the PAPI is wired up, seems to require that we give proper types to _other_ Processor that read from the global state store. For this reason, the store (which takes `Serdes` with proper types) is wrapped with a `MeteredStore` (like all others) to do the Serde work, and this MeteredStore is also exposed to the global-Processor? Might be good for Walker to dig into this to find out the details? If would of course be nice if we could avoid the unnecessary deserialization on topic read, and re-serialization on global-store put for this case, but it seems not to be straightforward to do... (2). Is this about the PAPI/Topology? For this case, we don't have any config object across the board. We only do this in the DSL. Hence, I would propose to just follow the existing pattern in this KIP to keep the API consistent. For the DSL, it could make sense of course. -- Of course, if we think the PAPI could be improved with config objects, we could do this in a dedicate KIP. @Lucas: The PAPI is unfortunately (by design) much more open and less restrictive. If a users has a custom state store, we need some `Processor` code from them, because we cannot provide a built-in processor for an unknown store. The overload which won't take a processor would only work for the built-in key-value store, what I assume would cover most use-cases, however, we should keep the door open for other use cases. Otherwise, we disallow this optimization for custom stores. PAPI is really about flexibility, and yes, with great power comes great responsibility for the users :) But this actually highlights a different aspect: the overload not accepting a custom `Processor` but using a built-in processor, should not accept a generic `StoreBuilder` but should restrict the type to `StoreBuilder`? -Matthias On 3/6/24 1:14 PM, Lucas Brutschy wrote: Hey Walker Thanks for the KIP, and congrats on the KiBiKIP ;) My main point is that I'd vote against introducing `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is just incorrect and should be removed or deprecated. If we think we need to keep the old behavior around, renaming the methods, e.g., to `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old behavior. But at a first glance, the old behavior just looks like a bug to me and should just be removed. So for this KIP, I'd keep two variants as you propose and drop the boolean parameter, but the two variants will be 1) a copy-restore variant without custom processing, as you propose. 2) a process-restore variant with custom processing (parameters the same as before). This should be combined with a clear warning in the Javadoc of the performance downside of this approach. Presentation: 1) I wonder if you could make another pass on the motivation section. I was lacking some context on this problem, and I think the nature of the restore issue only became clear to me when I read through the comments in the JIRA ticket you linked. 2) If we decide to keep the parameter `reprocessOnRestore`, the Javadoc on it should be extended. This is a somewhat subtle issue, and I don't think `restore by reprocessing` is enough of an explanation. Nits: `{@link ValueTransformer ValueTransformer}` -> `{@link ValueTransformer ValueTransformers}` `user defined` -> `user-defined` Cheers, Lucas On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna wrote: Hi Walker, Thanks for the KIP! Great that you are going to fix this long-standing issue! 1. I was wondering if we need the timestamp extractor as well as the key and value deserializer in Topology#addGlobalStore() that do not take a ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()? Since those methods setup a global state store that does not process any records, do they still need to deserialize records and extract timestamps? Name might still be needed, right? 2. From an API point of view, it might make sense to put all processor-related arguments into a parameter object. Something like: GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore() Just an idea, open for discussion. 3. Could you please go over the KIP and correct typos and other mistakes in the KIP? Best, Bruno On 3/2/24 1:43 AM, Matthias J. Sax wrote: Thanks for the KIP Walker. Fixing this issue, and providing users some flexibility to opt-in/out on "restore reprocessing" is certainly a good improvement. From an API design POV, I like the idea to not require passing in a Processor
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hey Walker Thanks for the KIP, and congrats on the KiBiKIP ;) My main point is that I'd vote against introducing `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is just incorrect and should be removed or deprecated. If we think we need to keep the old behavior around, renaming the methods, e.g., to `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old behavior. But at a first glance, the old behavior just looks like a bug to me and should just be removed. So for this KIP, I'd keep two variants as you propose and drop the boolean parameter, but the two variants will be 1) a copy-restore variant without custom processing, as you propose. 2) a process-restore variant with custom processing (parameters the same as before). This should be combined with a clear warning in the Javadoc of the performance downside of this approach. Presentation: 1) I wonder if you could make another pass on the motivation section. I was lacking some context on this problem, and I think the nature of the restore issue only became clear to me when I read through the comments in the JIRA ticket you linked. 2) If we decide to keep the parameter `reprocessOnRestore`, the Javadoc on it should be extended. This is a somewhat subtle issue, and I don't think `restore by reprocessing` is enough of an explanation. Nits: `{@link ValueTransformer ValueTransformer}` -> `{@link ValueTransformer ValueTransformers}` `user defined` -> `user-defined` Cheers, Lucas On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna wrote: > > Hi Walker, > > Thanks for the KIP! > > Great that you are going to fix this long-standing issue! > > 1. > I was wondering if we need the timestamp extractor as well as the key > and value deserializer in Topology#addGlobalStore() that do not take a > ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()? > Since those methods setup a global state store that does not process any > records, do they still need to deserialize records and extract > timestamps? Name might still be needed, right? > > 2. > From an API point of view, it might make sense to put all > processor-related arguments into a parameter object. Something like: > GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore() > Just an idea, open for discussion. > > 3. > Could you please go over the KIP and correct typos and other mistakes in > the KIP? > > > Best, > Bruno > > > > On 3/2/24 1:43 AM, Matthias J. Sax wrote: > > Thanks for the KIP Walker. > > > > Fixing this issue, and providing users some flexibility to opt-in/out on > > "restore reprocessing" is certainly a good improvement. > > > > From an API design POV, I like the idea to not require passing in a > > ProcessorSupplier to begin with. Given the current implementation of the > > restore process, this might have been the better API from the beginning > > on... Well, better late than never :) > > > > For this new method w/o a supplier, I am wondering if we want to keep > > `addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar > > thing via KIP-813. Just an idea. > > > > However, I am not convinced that adding a new boolean parameter is the > > best way to design the API. Unfortunately, I don't have any elegant > > proposal myself. Just a somewhat crazy idea to do a larger API change: > > > > Making a step back, a global store, is by definition a terminal node -- > > we don't support to add child nodes. Hence, while we expose a full > > `ProcessorContext` interface, we actually limit what functionality it > > supports. Thus, I am wondering if we should stop using the generic > > `Processor` interface to begin with, but design a new one which is > > tailored to the needs of global stores? -- This would of course be of > > much larger scope than originally intended by this KIP, but it might be > > a great opportunity to kill two birds with one stone? > > > > The only other question to consider is: do we believe that global stores > > will never have child nodes, or could we actually allow for child nodes > > in the future? If yes, it might not be smart to move off using > > `Processor` interface In general, I could imagine, especially as we > > now want to support "process on restore" to allow simple stateless > > operators like `map()` or `filter()` on a `GlobalTable` (or allow to add > > custom global processors) at some point in the future? > > > > Just wanted to put this out to see what people think... > > > > > > -Matthias > > > > > > On 2/29/24 1:26 PM, Walker Carlson wrote: > >> Hello everybody, > >> > >> I wanted to propose a change to our addGlobalStore methods so that the > >> restore behavior can be controlled on a preprocessor level. This should > >> help Kafka Stream users to better tune Global stores added with the > >> processor API to better fit their needs. > >> > >> Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ > >> > >> Thanks, > >> Walker > >>
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Hi Walker, Thanks for the KIP! Great that you are going to fix this long-standing issue! 1. I was wondering if we need the timestamp extractor as well as the key and value deserializer in Topology#addGlobalStore() that do not take a ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()? Since those methods setup a global state store that does not process any records, do they still need to deserialize records and extract timestamps? Name might still be needed, right? 2. From an API point of view, it might make sense to put all processor-related arguments into a parameter object. Something like: GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore() Just an idea, open for discussion. 3. Could you please go over the KIP and correct typos and other mistakes in the KIP? Best, Bruno On 3/2/24 1:43 AM, Matthias J. Sax wrote: Thanks for the KIP Walker. Fixing this issue, and providing users some flexibility to opt-in/out on "restore reprocessing" is certainly a good improvement. From an API design POV, I like the idea to not require passing in a ProcessorSupplier to begin with. Given the current implementation of the restore process, this might have been the better API from the beginning on... Well, better late than never :) For this new method w/o a supplier, I am wondering if we want to keep `addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar thing via KIP-813. Just an idea. However, I am not convinced that adding a new boolean parameter is the best way to design the API. Unfortunately, I don't have any elegant proposal myself. Just a somewhat crazy idea to do a larger API change: Making a step back, a global store, is by definition a terminal node -- we don't support to add child nodes. Hence, while we expose a full `ProcessorContext` interface, we actually limit what functionality it supports. Thus, I am wondering if we should stop using the generic `Processor` interface to begin with, but design a new one which is tailored to the needs of global stores? -- This would of course be of much larger scope than originally intended by this KIP, but it might be a great opportunity to kill two birds with one stone? The only other question to consider is: do we believe that global stores will never have child nodes, or could we actually allow for child nodes in the future? If yes, it might not be smart to move off using `Processor` interface In general, I could imagine, especially as we now want to support "process on restore" to allow simple stateless operators like `map()` or `filter()` on a `GlobalTable` (or allow to add custom global processors) at some point in the future? Just wanted to put this out to see what people think... -Matthias On 2/29/24 1:26 PM, Walker Carlson wrote: Hello everybody, I wanted to propose a change to our addGlobalStore methods so that the restore behavior can be controlled on a preprocessor level. This should help Kafka Stream users to better tune Global stores added with the processor API to better fit their needs. Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ Thanks, Walker
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Thanks for the KIP Walker. Fixing this issue, and providing users some flexibility to opt-in/out on "restore reprocessing" is certainly a good improvement. From an API design POV, I like the idea to not require passing in a ProcessorSupplier to begin with. Given the current implementation of the restore process, this might have been the better API from the beginning on... Well, better late than never :) For this new method w/o a supplier, I am wondering if we want to keep `addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar thing via KIP-813. Just an idea. However, I am not convinced that adding a new boolean parameter is the best way to design the API. Unfortunately, I don't have any elegant proposal myself. Just a somewhat crazy idea to do a larger API change: Making a step back, a global store, is by definition a terminal node -- we don't support to add child nodes. Hence, while we expose a full `ProcessorContext` interface, we actually limit what functionality it supports. Thus, I am wondering if we should stop using the generic `Processor` interface to begin with, but design a new one which is tailored to the needs of global stores? -- This would of course be of much larger scope than originally intended by this KIP, but it might be a great opportunity to kill two birds with one stone? The only other question to consider is: do we believe that global stores will never have child nodes, or could we actually allow for child nodes in the future? If yes, it might not be smart to move off using `Processor` interface In general, I could imagine, especially as we now want to support "process on restore" to allow simple stateless operators like `map()` or `filter()` on a `GlobalTable` (or allow to add custom global processors) at some point in the future? Just wanted to put this out to see what people think... -Matthias On 2/29/24 1:26 PM, Walker Carlson wrote: Hello everybody, I wanted to propose a change to our addGlobalStore methods so that the restore behavior can be controlled on a preprocessor level. This should help Kafka Stream users to better tune Global stores added with the processor API to better fit their needs. Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ Thanks, Walker