>> ok, question then - is it possible to use state store with .aggregate()?

Not sure what you exactly mean by this. An aggregations always uses a
store; it's a stateful operation and cannot be computed without a store.

For TopN, if you get the hit-count as input, you can use a
`.aggregate()` operator that uses an array or list out output -- this
list contains the topN and each time, aggregate() is called, you check
if the new count replaces and existing count in the array/list.


-Matthias

On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote:
> Thanks guys,
> 
> ok, question then - is it possible to use state store with .aggregate()?
> 
> Here are some details on counting, we basically looking for TopN +
> Remaining calculation.
> 
> Example:
> 
> - incoming data: api url -> hit count
> 
> - we want output: Top 20 urls per each domain per hour + remaining count
> per domain (e.g. sum of all other urls hits that do not belong to top 10
> per each domain per hour).
> 
> With some grouping variations.
> 
> Make some sense? Always open for better ideas :)
> 
> 
> 
> 
> 
> 
> 
> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hello Dmitriy,
>>
>> You can "simulate" an lower-level processor API by 1) adding the stores you
>> need via the builder#addStore(); 2) do a manual "through" call after
>> "selectKey" (the selected key will be the same as your original groupBy
>> call), and then from the repartitioned stream add the `transform()`
>> operator to do manual windowed counting.
>>
>> But before you really go into this route, I'd first like to validate if the
>> provided `Aggregate`, `Initialize` functions really cannot meet your
>> "overcomplicated
>> version of record counting", could you elaborate a bit more on this logic
>> so maybe we can still around it around with the pure high-level DSL?
>>
>>
>> Guozhang
>>
>>
>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
>> dvsekhval...@gmail.com>
>> wrote:
>>
>>> Hey, good day everyone,
>>>
>>> another kafka-streams friday question.
>>>
>>> We hit the wall with DSL implementation and would like to try low-level
>>> Processor API.
>>>
>>> What we looking for is to:
>>>   - repartition incoming source stream via grouping records by some
>> fields
>>> + windowed (hourly, daily, e.t.c).
>>>    - and then apply custom Processor on grouped data.
>>>
>>> Processor gonna do some overcomplicated version of record counting and
>> need
>>> persistent KV state store access.
>>>
>>> The issue - neither KGroupedStream nor TimeWindowedKStream provides api
>> to
>>> hook processor into topology.
>>>
>>> Just to show some code:
>>>
>>> in.groupBy((key, value) -> .....)
>>>    .windowedBy(Hourly)
>>>    .transform(Processor) // Missing this one?
>>>
>>>
>>> What our options to combine both? We were thinking that we can
>> re-implement
>>> grouping with low-level API after investigating source code, but looks
>> like
>>> overkill.
>>>
>>> Thank you.
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to