KGroupedStream and TimeWindowedKStream are only logical representations at DSL level. They don't really "do" anything.
Thus, you can mimic them as follows: builder.addStore(...) in.selectKey().through(...).transform(..., "storeName"). selectKey() set's the new key for the grouping and the through() does the actual repartitioning. Note, that you need to create the topic you specify in through() manually and with the desired number of partitions before you start your application. The transform() is doing the windowing and aggregation in a single operator. If you want to do windowing, you might want to use the WindowedStore implementation from the DSL. Hope this helps. -Matthias On 4/6/18 8:49 AM, Dmitriy Vsekhvalnov wrote: > Hey, good day everyone, > > another kafka-streams friday question. > > We hit the wall with DSL implementation and would like to try low-level > Processor API. > > What we looking for is to: > - repartition incoming source stream via grouping records by some fields > + windowed (hourly, daily, e.t.c). > - and then apply custom Processor on grouped data. > > Processor gonna do some overcomplicated version of record counting and need > persistent KV state store access. > > The issue - neither KGroupedStream nor TimeWindowedKStream provides api to > hook processor into topology. > > Just to show some code: > > in.groupBy((key, value) -> .....) > .windowedBy(Hourly) > .transform(Processor) // Missing this one? > > > What our options to combine both? We were thinking that we can re-implement > grouping with low-level API after investigating source code, but looks like > overkill. > > Thank you. >
Description: OpenPGP digital signature