For (1), no.

If you want to do manual put/get you should use a Transformer and
implement a custom operator.

Btw: here is an example of TopN:
https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java



-Matthias

On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote:
> Hi Matthias, thanks
> 
> clarifications below:
> 
> 1. .aggregate( () -> .. ,
>                        (k, v, agg) -> {
>                            //Can i access KV store here for manual put/get?
>                       });
> 
> 2. TopN is not hard, we using pretty much same approach you describe, just
> with bounded priority queue.  The problematic part with 'remaining count' -
> everything else not in topN records. It appeared to be quite complex in
> streaming world (or we missed something). I'll try to illustrate, assuming
> simplified event flow:
> 
>  - acme.com: 100 hits  -> too small, not in TopN, we adding it to remaining
> count
>  - ... some time later....
>  - acme.com: 150 hits -> still too small, adding to remaining count
> 
> Problem: we added 250 hits to remaining, but actually we had to add only
> 150 hits. We have to subtract previous count and it means we need to keep
> them all somewhere. That's where we hope access to KV store can help.
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>>>> ok, question then - is it possible to use state store with .aggregate()?
>>
>> Not sure what you exactly mean by this. An aggregations always uses a
>> store; it's a stateful operation and cannot be computed without a store.
>>
>> For TopN, if you get the hit-count as input, you can use a
>> `.aggregate()` operator that uses an array or list out output -- this
>> list contains the topN and each time, aggregate() is called, you check
>> if the new count replaces and existing count in the array/list.
>>
>>
>> -Matthias
>>
>> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote:
>>> Thanks guys,
>>>
>>> ok, question then - is it possible to use state store with .aggregate()?
>>>
>>> Here are some details on counting, we basically looking for TopN +
>>> Remaining calculation.
>>>
>>> Example:
>>>
>>> - incoming data: api url -> hit count
>>>
>>> - we want output: Top 20 urls per each domain per hour + remaining count
>>> per domain (e.g. sum of all other urls hits that do not belong to top 10
>>> per each domain per hour).
>>>
>>> With some grouping variations.
>>>
>>> Make some sense? Always open for better ideas :)
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>
>>>> Hello Dmitriy,
>>>>
>>>> You can "simulate" an lower-level processor API by 1) adding the stores
>> you
>>>> need via the builder#addStore(); 2) do a manual "through" call after
>>>> "selectKey" (the selected key will be the same as your original groupBy
>>>> call), and then from the repartitioned stream add the `transform()`
>>>> operator to do manual windowed counting.
>>>>
>>>> But before you really go into this route, I'd first like to validate if
>> the
>>>> provided `Aggregate`, `Initialize` functions really cannot meet your
>>>> "overcomplicated
>>>> version of record counting", could you elaborate a bit more on this
>> logic
>>>> so maybe we can still around it around with the pure high-level DSL?
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
>>>> dvsekhval...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey, good day everyone,
>>>>>
>>>>> another kafka-streams friday question.
>>>>>
>>>>> We hit the wall with DSL implementation and would like to try low-level
>>>>> Processor API.
>>>>>
>>>>> What we looking for is to:
>>>>>   - repartition incoming source stream via grouping records by some
>>>> fields
>>>>> + windowed (hourly, daily, e.t.c).
>>>>>    - and then apply custom Processor on grouped data.
>>>>>
>>>>> Processor gonna do some overcomplicated version of record counting and
>>>> need
>>>>> persistent KV state store access.
>>>>>
>>>>> The issue - neither KGroupedStream nor TimeWindowedKStream provides api
>>>> to
>>>>> hook processor into topology.
>>>>>
>>>>> Just to show some code:
>>>>>
>>>>> in.groupBy((key, value) -> .....)
>>>>>    .windowedBy(Hourly)
>>>>>    .transform(Processor) // Missing this one?
>>>>>
>>>>>
>>>>> What our options to combine both? We were thinking that we can
>>>> re-implement
>>>>> grouping with low-level API after investigating source code, but looks
>>>> like
>>>>> overkill.
>>>>>
>>>>> Thank you.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to