>> ok, question then - is it possible to use state store with .aggregate()?
Not sure what you exactly mean by this. An aggregations always uses a store; it's a stateful operation and cannot be computed without a store. For TopN, if you get the hit-count as input, you can use a `.aggregate()` operator that uses an array or list out output -- this list contains the topN and each time, aggregate() is called, you check if the new count replaces and existing count in the array/list. -Matthias On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote: > Thanks guys, > > ok, question then - is it possible to use state store with .aggregate()? > > Here are some details on counting, we basically looking for TopN + > Remaining calculation. > > Example: > > - incoming data: api url -> hit count > > - we want output: Top 20 urls per each domain per hour + remaining count > per domain (e.g. sum of all other urls hits that do not belong to top 10 > per each domain per hour). > > With some grouping variations. > > Make some sense? Always open for better ideas :) > > > > > > > > On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Dmitriy, >> >> You can "simulate" an lower-level processor API by 1) adding the stores you >> need via the builder#addStore(); 2) do a manual "through" call after >> "selectKey" (the selected key will be the same as your original groupBy >> call), and then from the repartitioned stream add the `transform()` >> operator to do manual windowed counting. >> >> But before you really go into this route, I'd first like to validate if the >> provided `Aggregate`, `Initialize` functions really cannot meet your >> "overcomplicated >> version of record counting", could you elaborate a bit more on this logic >> so maybe we can still around it around with the pure high-level DSL? >> >> >> Guozhang >> >> >> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov < >> dvsekhval...@gmail.com> >> wrote: >> >>> Hey, good day everyone, >>> >>> another kafka-streams friday question. >>> >>> We hit the wall with DSL implementation and would like to try low-level >>> Processor API. >>> >>> What we looking for is to: >>> - repartition incoming source stream via grouping records by some >> fields >>> + windowed (hourly, daily, e.t.c). >>> - and then apply custom Processor on grouped data. >>> >>> Processor gonna do some overcomplicated version of record counting and >> need >>> persistent KV state store access. >>> >>> The issue - neither KGroupedStream nor TimeWindowedKStream provides api >> to >>> hook processor into topology. >>> >>> Just to show some code: >>> >>> in.groupBy((key, value) -> .....) >>> .windowedBy(Hourly) >>> .transform(Processor) // Missing this one? >>> >>> >>> What our options to combine both? We were thinking that we can >> re-implement >>> grouping with low-level API after investigating source code, but looks >> like >>> overkill. >>> >>> Thank you. >>> >> >> >> >> -- >> -- Guozhang >> >
signature.asc
Description: OpenPGP digital signature