Hi Aljoscha,
Thanks for the information. In my case I didn't clean up the state
explicitly in Trigger.onMerge(). Only
OnMergeContext.mergePartitionedState(stateDesc) is called within my
implementation, with the intention to copy the states from merged windows
and re-register the timers.
However, the method "cut-n-paste" the states from merged windows to result
window. To my understanding, the method should just copy the state but
leave the state cleanup to be done within Trigger.clear() method. It's
because user might want to release resources based on the data within the
state. To my case, it's the timers within the state.
Below is my implementation:
public class MyTrigger<T, W extends Window> extends Trigger<T, W> {
...
...
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
reRegister(ctx);
}
public void clear(W window, TriggerContext ctx) throws Exception {
// clean timers, which are retrieved from state
Iterable<Long> allRegisteredEventTimes =
ctx.getPartitionedState(stateDesc).get();
deleteTimers(allRegisteredEventTimes);
state.clear();
}
...
...
}
Best
Yan
On Tue, Sep 26, 2017 at 8:28 AM, Aljoscha Krettek <[email protected]>
wrote:
> Hi,
>
> I think you should be able to use state for cleaning up your timers in
> Trigger.clear(). For this, you have to make sure to not clean up the state
> in Trigger.onMerge() and instead remove it in Trigger.clear().
>
> I'm not sure whether this will be possible for your use case, though.
>
> Best,
> Aljoscha
>
> > On 26. Sep 2017, at 11:17, Stefan Richter <[email protected]>
> wrote:
> >
> > Hi,
> >
> > I think that it is currently not possible to delete timers that did not
> trigger, because currently some of the data structures used for timers do
> not support random deletes efficiently. For the second part of the question
> about keeping the state of merged windows, I added Aljoscha in CC who might
> provide more information about the topic.
> >
> > Best,
> > Stefan
> >
> >> Am 25.09.2017 um 22:59 schrieb Yan Zhou [FDS Science] <
> [email protected]>:
> >>
> >> Hi,
> >>
> >> I am implementing a merge-able trigger, and having a problem in
> clearing the registered timers for a merged window (a window has been
> merged into the merging result). For my implementation, the trigger
> registers multiple timers for each element at Trigger#onElement(). State is
> used to keep track of the registered event time, so that timer can be
> removed at Trigger#clear() later.
> >>
> >> However, clearing the registered timers in this way doesn't work if the
> window has been merged. The state of origin window is removed during
> merging. Method AbstractHeapMergingState#mergeNamespaces() removes the
> state of merged window. I think the ContinuousEventTimeTrigger shipped with
> flink would have same issue.
> >>
> >> My question is is there a way to keep the state for a merged window?
> One way I can think of is to implement a custom heap state that add the
> state back in AbstractHeapMergingState#mergeState() method. Or is there a
> way to clear the timers without using state? Can I twist the internal
> timer's source code to expose a method to remove all timers for a specified
> window?
> >>
> >> Please advise and thank you for your help.
> >>
> >> Best
> >> Yan
> >
>
>