Hi Chandu,

Maybe you can use a custom trigger:
*     .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*

This would continuously trigger your aggregate every period of time.

Thanks,
Rafi


On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin <azagre...@apache.org> wrote:

> Hi Chandu,
>
> I am not sure whether using the windowing API is helpful in this case at
> all.
>
> At least, you could try to consume the data not only by windowing but also
> by a custom stateful function.
> You look into the AggregatingState [1]. Then you could do whatever you
> want with the current aggregated value.
> If you still need to do something with the result of windowing, you could
> do it as now or simulate it with timers [2] in that same stateful function.
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
>
> On Tue, Dec 3, 2019 at 12:21 AM chandu soa <chandu...@gmail.com> wrote:
>
>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
>> session window when new event arrives*
>>
>>
>>
>> AggregateFunction#getResults() is called only when window completes. My
>> need is emit intermediate accumulator values(result of
>> AggregateFunction#add()) as well and write them to Sink. Both
>> AggregateFunction#getResult() and ProcessWindowFunction() provides
>> aggregated result, only when the window is closed.
>>
>> *Any thoughts please, how to emit or stream intermediate accumulator
>> state as soon as new event arrive when window is open? Need to implement
>> custom trigger or Assigner?*
>>
>>
>>
>> To give you some background, when user watches a video we get events -
>> when clicked, thereafter every ~ 15minutes, and finally when user close the
>> video.
>>
>> I need to aggregate them as soon as they arrive and post it to
>> destination. For example, if user watching a two-hour movie I get events
>> for 15 min interval(0,15,30,...,120), whenever I get a event need to
>> aggregate watched percentage so far and write it to sink(0%, 12.5%,
>> 25%,...,100%). The below implementation emitting(getResult()) a single
>> event 20 minutes after watching a video.
>>
>>
>>
>>
>>
>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>>
>>
>> .aggregate(new EventAggregator())
>>
>>
>> .filter(new FinalFilter())
>>
>>
>> .addSink(...)
>>
>>
>> Appreciate your help.
>>
>>
>> Thanks,
>>
>> chandu
>>
>

Reply via email to