For (1), no. If you want to do manual put/get you should use a Transformer and implement a custom operator.
Btw: here is an example of TopN: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java -Matthias On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote: > Hi Matthias, thanks > > clarifications below: > > 1. .aggregate( () -> .. , > (k, v, agg) -> { > //Can i access KV store here for manual put/get? > }); > > 2. TopN is not hard, we using pretty much same approach you describe, just > with bounded priority queue. The problematic part with 'remaining count' - > everything else not in topN records. It appeared to be quite complex in > streaming world (or we missed something). I'll try to illustrate, assuming > simplified event flow: > > - acme.com: 100 hits -> too small, not in TopN, we adding it to remaining > count > - ... some time later.... > - acme.com: 150 hits -> still too small, adding to remaining count > > Problem: we added 250 hits to remaining, but actually we had to add only > 150 hits. We have to subtract previous count and it means we need to keep > them all somewhere. That's where we hope access to KV store can help. > > > > > > > > > > > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >>>> ok, question then - is it possible to use state store with .aggregate()? >> >> Not sure what you exactly mean by this. An aggregations always uses a >> store; it's a stateful operation and cannot be computed without a store. >> >> For TopN, if you get the hit-count as input, you can use a >> `.aggregate()` operator that uses an array or list out output -- this >> list contains the topN and each time, aggregate() is called, you check >> if the new count replaces and existing count in the array/list. >> >> >> -Matthias >> >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote: >>> Thanks guys, >>> >>> ok, question then - is it possible to use state store with .aggregate()? >>> >>> Here are some details on counting, we basically looking for TopN + >>> Remaining calculation. >>> >>> Example: >>> >>> - incoming data: api url -> hit count >>> >>> - we want output: Top 20 urls per each domain per hour + remaining count >>> per domain (e.g. sum of all other urls hits that do not belong to top 10 >>> per each domain per hour). >>> >>> With some grouping variations. >>> >>> Make some sense? Always open for better ideas :) >>> >>> >>> >>> >>> >>> >>> >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >>> >>>> Hello Dmitriy, >>>> >>>> You can "simulate" an lower-level processor API by 1) adding the stores >> you >>>> need via the builder#addStore(); 2) do a manual "through" call after >>>> "selectKey" (the selected key will be the same as your original groupBy >>>> call), and then from the repartitioned stream add the `transform()` >>>> operator to do manual windowed counting. >>>> >>>> But before you really go into this route, I'd first like to validate if >> the >>>> provided `Aggregate`, `Initialize` functions really cannot meet your >>>> "overcomplicated >>>> version of record counting", could you elaborate a bit more on this >> logic >>>> so maybe we can still around it around with the pure high-level DSL? >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov < >>>> dvsekhval...@gmail.com> >>>> wrote: >>>> >>>>> Hey, good day everyone, >>>>> >>>>> another kafka-streams friday question. >>>>> >>>>> We hit the wall with DSL implementation and would like to try low-level >>>>> Processor API. >>>>> >>>>> What we looking for is to: >>>>> - repartition incoming source stream via grouping records by some >>>> fields >>>>> + windowed (hourly, daily, e.t.c). >>>>> - and then apply custom Processor on grouped data. >>>>> >>>>> Processor gonna do some overcomplicated version of record counting and >>>> need >>>>> persistent KV state store access. >>>>> >>>>> The issue - neither KGroupedStream nor TimeWindowedKStream provides api >>>> to >>>>> hook processor into topology. >>>>> >>>>> Just to show some code: >>>>> >>>>> in.groupBy((key, value) -> .....) >>>>> .windowedBy(Hourly) >>>>> .transform(Processor) // Missing this one? >>>>> >>>>> >>>>> What our options to combine both? We were thinking that we can >>>> re-implement >>>>> grouping with low-level API after investigating source code, but looks >>>> like >>>>> overkill. >>>>> >>>>> Thank you. >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature