Re: [Kafka Streams], Processor API for KTable and KGroupedStream
You cannot add a `Processor`. You can only use `aggregate() / reduce() / count()` (which of course will add a pre-defined processor). `groupByKey()` is really just a "meta operation" that checks if the key was changes upstream, and to insert a repartition/shuffle step if necessary. Thus, if you don't change the upstream key, you can just add a processor to `someStream` (groupByKey() would be a no-op anyway). If you did change the key upstream, you can do `someStream.repartition().transform()` to repartition explicitly. HTH. On 1/13/24 3:14 AM, Igor Maznitsa wrote: Thanks a lot for explanation but could you provide a bit more details about KGroupedStream? It is just interface and not extends KStream so how I can add processor in the case below? / KStream someStream = / / someStream / / .groupByKey() / */how to add processor for resulted grouped stream here ???/* On 2024-Jan-13 01:22, Matthias J. Sax wrote: `KGroupedStream` is just an "intermediate representation" to get a better flow in the DSL. It's not a "top level" abstraction like KStream/KTable. For `KTable` there is `transformValue()` -- there is no `transform()` because keying must be preserved -- if you want to change the keying you need to use `KTable#groupBy()` (data needs to be repartitioned if you change the key). HTH. -Matthias On 1/12/24 11:47 AM, Igor Maznitsa wrote: Hello Is there any way in Kafka Streams API to define processors for KTable and KGroupedStream like KStream#transform? How to provide a custom processor for KTable or KGroupedStream which could for instance provide way to not downstream selected events?
Re: [Kafka Streams], Processor API for KTable and KGroupedStream
Thanks a lot for explanation but could you provide a bit more details about KGroupedStream? It is just interface and not extends KStream so how I can add processor in the case below? / KStream someStream = / / someStream / / .groupByKey() / */how to add processor for resulted grouped stream here ???/* On 2024-Jan-13 01:22, Matthias J. Sax wrote: `KGroupedStream` is just an "intermediate representation" to get a better flow in the DSL. It's not a "top level" abstraction like KStream/KTable. For `KTable` there is `transformValue()` -- there is no `transform()` because keying must be preserved -- if you want to change the keying you need to use `KTable#groupBy()` (data needs to be repartitioned if you change the key). HTH. -Matthias On 1/12/24 11:47 AM, Igor Maznitsa wrote: Hello Is there any way in Kafka Streams API to define processors for KTable and KGroupedStream like KStream#transform? How to provide a custom processor for KTable or KGroupedStream which could for instance provide way to not downstream selected events?
Re: [Kafka Streams], Processor API for KTable and KGroupedStream
`KGroupedStream` is just an "intermediate representation" to get a better flow in the DSL. It's not a "top level" abstraction like KStream/KTable. For `KTable` there is `transformValue()` -- there is no `transform()` because keying must be preserved -- if you want to change the keying you need to use `KTable#groupBy()` (data needs to be repartitioned if you change the key). HTH. -Matthias On 1/12/24 11:47 AM, Igor Maznitsa wrote: Hello Is there any way in Kafka Streams API to define processors for KTable and KGroupedStream like KStream#transform? How to provide a custom processor for KTable or KGroupedStream which could for instance provide way to not downstream selected events?
[Kafka Streams], Processor API for KTable and KGroupedStream
Hello Is there any way in Kafka Streams API to define processors for KTable and KGroupedStream like KStream#transform? How to provide a custom processor for KTable or KGroupedStream which could for instance provide way to not downstream selected events? -- Igor Maznitsa email: rrg4...@gmail.com