Hi,

Could you maybe show the code of your trigger?

Best,
Aljoscha

> On 15. Sep 2017, at 11:39, gerardg <ger...@talaia.io> wrote:
> 
> Hi,
> 
> I have the following operator:
> 
> mainStream
>      .coGroup(coStream)
>      .where(_.uuid).equalTo(_.uuid)
>      .window(GlobalWindows.create())
>      .trigger(triggerWhenAllReceived)
>      .apply(mergeElements)
> 
> TLDR; It seems that the checkpointed state of the operator keeps growing
> forever even if I clear the state and purge the buffered elements using a
> processing time trigger.
> 
> Details:
> 
> Basically I have a main stream that gets elements from another stream and
> when it has received all the elements that have been waiting for it outputs
> a new element that has been created using the information of all the
> received elements.
> 
> To do so I use a GlobalWindow and a custom trigger. The custom trigger has
> as state two counters, the elements that it has to receive (extracted from
> the element received from the main stream) and the elements that it has
> received so far from the other stream. When the two counters have the same
> value I use the FIRE_AND_PURGE trigger to output all the elements in the
> pane (I understand that each set of elements is stored in a pane defined by
> the global window and the UUID key). 
> 
> To cleanup the state (and to not keep elements waiting forever) I setup a
> processing time timer which basically clears the state and outputs
> FIRE_AND_PURGE to remove the buffered elements.
> 
> I must be missing something because the checkpointed state keeps growing
> forever so I suspect that the pane is not completely removed.
> 
> Gerard
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to