Hi Matthias, thanks

clarifications below:

1. .aggregate( () -> .. ,
                       (k, v, agg) -> {
                           //Can i access KV store here for manual put/get?

2. TopN is not hard, we using pretty much same approach you describe, just
with bounded priority queue.  The problematic part with 'remaining count' -
everything else not in topN records. It appeared to be quite complex in
streaming world (or we missed something). I'll try to illustrate, assuming
simplified event flow:

 - acme.com: 100 hits  -> too small, not in TopN, we adding it to remaining
 - ... some time later....
 - acme.com: 150 hits -> still too small, adding to remaining count

Problem: we added 250 hits to remaining, but actually we had to add only
150 hits. We have to subtract previous count and it means we need to keep
them all somewhere. That's where we hope access to KV store can help.

On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <matth...@confluent.io>

> >> ok, question then - is it possible to use state store with .aggregate()?
> Not sure what you exactly mean by this. An aggregations always uses a
> store; it's a stateful operation and cannot be computed without a store.
> For TopN, if you get the hit-count as input, you can use a
> `.aggregate()` operator that uses an array or list out output -- this
> list contains the topN and each time, aggregate() is called, you check
> if the new count replaces and existing count in the array/list.
> -Matthias
> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote:
> > Thanks guys,
> >
> > ok, question then - is it possible to use state store with .aggregate()?
> >
> > Here are some details on counting, we basically looking for TopN +
> > Remaining calculation.
> >
> > Example:
> >
> > - incoming data: api url -> hit count
> >
> > - we want output: Top 20 urls per each domain per hour + remaining count
> > per domain (e.g. sum of all other urls hits that do not belong to top 10
> > per each domain per hour).
> >
> > With some grouping variations.
> >
> > Make some sense? Always open for better ideas :)
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hello Dmitriy,
> >>
> >> You can "simulate" an lower-level processor API by 1) adding the stores
> you
> >> need via the builder#addStore(); 2) do a manual "through" call after
> >> "selectKey" (the selected key will be the same as your original groupBy
> >> call), and then from the repartitioned stream add the `transform()`
> >> operator to do manual windowed counting.
> >>
> >> But before you really go into this route, I'd first like to validate if
> the
> >> provided `Aggregate`, `Initialize` functions really cannot meet your
> >> "overcomplicated
> >> version of record counting", could you elaborate a bit more on this
> logic
> >> so maybe we can still around it around with the pure high-level DSL?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
> >> dvsekhval...@gmail.com>
> >> wrote:
> >>
> >>> Hey, good day everyone,
> >>>
> >>> another kafka-streams friday question.
> >>>
> >>> We hit the wall with DSL implementation and would like to try low-level
> >>> Processor API.
> >>>
> >>> What we looking for is to:
> >>>   - repartition incoming source stream via grouping records by some
> >> fields
> >>> + windowed (hourly, daily, e.t.c).
> >>>    - and then apply custom Processor on grouped data.
> >>>
> >>> Processor gonna do some overcomplicated version of record counting and
> >> need
> >>> persistent KV state store access.
> >>>
> >>> The issue - neither KGroupedStream nor TimeWindowedKStream provides api
> >> to
> >>> hook processor into topology.
> >>>
> >>> Just to show some code:
> >>>
> >>> in.groupBy((key, value) -> .....)
> >>>    .windowedBy(Hourly)
> >>>    .transform(Processor) // Missing this one?
> >>>
> >>>
> >>> What our options to combine both? We were thinking that we can
> >> re-implement
> >>> grouping with low-level API after investigating source code, but looks
> >> like
> >>> overkill.
> >>>
> >>> Thank you.
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >

Reply via email to