Hello, I tried using *processWindowFunction* since it gives access to *globalstate* through *context*. My question is, Is it possible to discard single events inside *process* function of *processWindowFunction* just like *onElements* of triggers? For my use case it seems that trigger is not sufficient but i want to know how i can do it using processWindowFunction. Appreciate any pointers.
Thanks! On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha <diwakar.n...@gmail.com> wrote: > Hi Arvid, > > Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced > duplicates though the result is still the same i.e record 1 is fired both > at the start and the end of the window. so for every window i see the first > event of the window is coming twice in the output. > > I'm trying to explain again the desired behaviour, hopefully it becomes > clear. > > all the records have the same key. > current output. > >> record 1 : first event in the window-1 : fired >> record 2 : last event in the window-1 : fired >> record 3 : first event in the window-2 : fired. [this should not have >> fired since it has the same Key as all other records.] >> record 4, record 5 : - 2 events in the window-2 : fired. >> > > expected output. > >> record 1 : first event in the window-1 : fired >> record 2 : last event in the window-1 : fired >> record 3,4,5 : all event in the window-2 : fired > > > I think my problem is to store KeyBy values between windows. For example, > I want to retain the KeyBy for 1 day. In that case, record 1 is fired > instantly, all other records (of same key as record1) are always grouped in > each window (say 1 min) instead of firing instantly. > > Thanks! > > On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Diwakar, >> >> the issue is that you fire_and_purge the state, you should just FIRE on >> the first element (or else you lose the information that you received the >> element already). >> You'd use FIRE_AND_PURGE on the last element though. >> >> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman < >> khachatryan.ro...@gmail.com> wrote: >> >>> Hi Diwakar, >>> >>> I'm not sure I fully understand your question. >>> If event handling in one window depends on some other windows than >>> TriggerContext.getPartitionedState can not be used. Triggers don't have >>> access to the global state (only to key-window scoped state). >>> If that's what you want then please consider ProcessWindowFunction [1] >>> where you can use context.globalState() in your process function. >>> >>> [1] >>> >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction >>> >>> Regards, >>> Roman >>> >>> >>> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <diwakar.n...@gmail.com> >>> wrote: >>> >>>> >>>> Hello, >>>> >>>> I'm trying to use a custom trigger for one of my use case. I have a >>>> basic logic (as shown below) of using keyBy on the input stream and using a >>>> window of 1 min. >>>> >>>> .keyBy(<key selector>) >>>> .window(TumblingEventTimeWindows.of(Time.seconds(60))) >>>> .trigger(new CustomTrigger()) >>>> .aggregate(Input.getAggregationFunction(), new >>>> AggregationProcessingWindow()); >>>> >>>> >>>> My custom trigger is expected to fire the first event of the keyBy >>>> instantly and any subsequent events should be aggregated in the window. >>>> >>>> .trigger(new Trigger<Record, TimeWindow>() { >>>>> @Override >>>>> public TriggerResult onElement(Record record, long l, TimeWindow >>>>> timeWindow, TriggerContext triggerContext) throws Exception { >>>>> ValueState<Boolean> firstSeen = >>>>> triggerContext.getPartitionedState(firstSceenDescriptor); >>>>> if(firstSeen.value() == null) { >>>>> firstSeen.update(true); >>>>> // fire trigger to early evaluate window and purge that event. >>>>> return TriggerResult.FIRE_AND_PURGE; >>>>> } >>>>> // Continue. Do not evaluate window per element >>>>> return TriggerResult.CONTINUE; >>>>> } >>>>> @Override >>>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, >>>>> TriggerContext triggerContext) throws Exception { >>>>> // final evaluation and purge window state >>>>> return TriggerResult.FIRE_AND_PURGE; >>>>> } >>>>> @Override >>>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow, >>>>> TriggerContext triggerContext) throws Exception { >>>>> return TriggerResult.CONTINUE; >>>>> } >>>>> @Override >>>>> public void clear(TimeWindow timeWindow, TriggerContext >>>>> triggerContext) throws Exception { >>>>> >>>>> } >>>>> }) >>>> >>>> >>>> >>>> >>>> Currently, I see (for each window and same key) the first event of the >>>> window is always fired. But I want to see this happening for only the first >>>> window and for the subsequent window it should aggregate all the events and >>>> then fire. >>>> >>>> Example : all the records have the same key. >>>> current output. >>>> record 1 : first event in the window-1 : fired record 2 : last event in >>>> the window-1 : fired record 3 : first event in the window-2 : fired record >>>> 4, record 5 : - 2 events in the window-2 : fired. >>>> >>>> expected output. >>>> record 1 : first event in the window-1 : fired record 2 : last event in >>>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired >>>> window-2 should not fire the first event of the same key. >>>> >>>> I'm reading it here >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge >>>> but not able to solve it. Any pointers would be helpful. >>>> >>>> Thanks. >>>> >>>