Hello,

I tried using *processWindowFunction* since it gives access to
*globalstate* through
*context*. My question is, Is it possible to discard single events inside
*process* function of *processWindowFunction* just like *onElements* of
triggers?
For my use case it seems that trigger is not sufficient but i want to know
how i can do it using processWindowFunction. Appreciate any pointers.

Thanks!

On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha <diwakar.n...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced
> duplicates though the result is still the same i.e record 1 is fired both
> at the start and the end of the window. so for every window i see the first
> event of the window is coming twice in the output.
>
> I'm trying to explain again the desired behaviour, hopefully it becomes
> clear.
>
> all the records have the same key.
> current output.
>
>> record 1 : first event in the window-1 : fired
>> record 2 : last event in the window-1 : fired
>> record 3 : first event in the window-2 : fired. [this should not have
>> fired since it has the same Key as all other records.]
>> record 4, record 5 : - 2 events in the window-2 : fired.
>>
>
> expected output.
>
>> record 1 : first event in the window-1 : fired
>> record 2 : last event in the window-1 : fired
>> record 3,4,5 : all event in the window-2 : fired
>
>
> I think my problem is to store KeyBy values between windows. For example,
> I want to retain the KeyBy for 1 day. In that case, record 1 is fired
> instantly, all other records (of same key as record1) are always grouped in
> each window (say 1 min) instead of firing instantly.
>
> Thanks!
>
> On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Diwakar,
>>
>> the issue is that you fire_and_purge the state, you should just FIRE on
>> the first element (or else you lose the information that you received the
>> element already).
>> You'd use FIRE_AND_PURGE on the last element though.
>>
>> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Diwakar,
>>>
>>> I'm not sure I fully understand your question.
>>> If event handling in one window depends on some other windows than
>>> TriggerContext.getPartitionedState can not be used. Triggers don't have
>>> access to the global state (only to key-window scoped state).
>>> If that's what you want then please consider ProcessWindowFunction [1]
>>> where you can use context.globalState() in your process function.
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <diwakar.n...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Hello,
>>>>
>>>> I'm trying to use a custom trigger for one of my use case. I have a
>>>> basic logic (as shown below) of using keyBy on the input stream and using a
>>>> window of 1 min.
>>>>
>>>> .keyBy(<key selector>)
>>>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>>> .trigger(new CustomTrigger())
>>>> .aggregate(Input.getAggregationFunction(), new
>>>> AggregationProcessingWindow());
>>>>
>>>>
>>>> My custom trigger is expected to fire the first event of the keyBy
>>>> instantly and any subsequent events should be aggregated in the window.
>>>>
>>>> .trigger(new Trigger<Record, TimeWindow>() {
>>>>> @Override
>>>>> public TriggerResult onElement(Record record, long l, TimeWindow
>>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>>> ValueState<Boolean> firstSeen =
>>>>> triggerContext.getPartitionedState(firstSceenDescriptor);
>>>>> if(firstSeen.value() == null) {
>>>>> firstSeen.update(true);
>>>>> // fire trigger to early evaluate window and purge that event.
>>>>> return TriggerResult.FIRE_AND_PURGE;
>>>>> }
>>>>> // Continue. Do not evaluate window per element
>>>>> return TriggerResult.CONTINUE;
>>>>> }
>>>>> @Override
>>>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>>>>> TriggerContext triggerContext) throws Exception {
>>>>> // final evaluation and purge window state
>>>>> return TriggerResult.FIRE_AND_PURGE;
>>>>> }
>>>>> @Override
>>>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>>>>> TriggerContext triggerContext) throws Exception {
>>>>> return TriggerResult.CONTINUE;
>>>>> }
>>>>> @Override
>>>>> public void clear(TimeWindow timeWindow, TriggerContext
>>>>> triggerContext) throws Exception {
>>>>>
>>>>> }
>>>>> })
>>>>
>>>>
>>>>
>>>>
>>>> Currently, I see (for each window and same key) the first event of the
>>>> window is always fired. But I want to see this happening for only the first
>>>> window and for the subsequent window it should aggregate all the events and
>>>> then fire.
>>>>
>>>> Example : all the records have the same key.
>>>> current output.
>>>> record 1 : first event in the window-1 : fired record 2 : last event in
>>>> the window-1 : fired record 3 : first event in the window-2 : fired record
>>>> 4, record 5 : - 2 events in the window-2 : fired.
>>>>
>>>> expected output.
>>>> record 1 : first event in the window-1 : fired record 2 : last event in
>>>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>>>> window-2 should not fire the first event of the same key.
>>>>
>>>> I'm reading it here
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>>>> but not able to solve it. Any pointers would be helpful.
>>>>
>>>> Thanks.
>>>>
>>>

Reply via email to