Thanks for the quick response. I tried with a fix applied build and can see 
that memory is much more stable.

Gokhan

> On 2 Sep 2020, at 12:51 PM, Jan Lukavský <[email protected]> wrote:
> 
> Hi Gokhan,
> 
> this is related to [1], which is just going to be fixed.
> 
> Jan
> 
> [1] https://github.com/apache/beam/pull/12733 
> <https://github.com/apache/beam/pull/12733>
> On 9/2/20 10:24 AM, Gökhan Imral wrote:
>> Hi,
>> 
>> We have implemented a unbounded stream join using stateful DoFn and a global 
>> window by storing the elements when necessary in the state and using timers 
>> to clear the state when it should expire.  But after deploying our job we 
>> have discovered that the state size is growing all the time.While 
>> experimenting to find out the cause, I have discovered that the timers are 
>> not getting garbage collected.
>> 
>> This is the sample application I used for testing.
>> 
>>         PipelineOptions options = 
>> PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
>>         Pipeline p = Pipeline.create(options);
>>         p.apply(GenerateSequence.from(0).withRate(10000, 
>> Duration.standardSeconds(1)))
>>                 .apply(Filter.by((val) -> val < 1000000 || val % 500 == 0))
>>                 .apply(WithKeys.of((val) -> 
>> val)).setCoder(KvCoder.of(VarLongCoder.of(),VarLongCoder.of()))
>>                 .apply("Window", Window.<KV<Long, Long>>into(new 
>> GlobalWindows())
>>                         .triggering(AfterPane.elementCountAtLeast(1))
>>                         .discardingFiredPanes()
>>                         .withAllowedLateness(Duration.ZERO))
>>                 .apply("State", ParDo.of(new DoFn<KV<Long, Long>, KV<Long, 
>> Long>>() {
>>                                              @StateId("state")
>>                                              private final 
>> StateSpec<ValueState<Long>> stateSpec = StateSpecs.value();
>>                                              @TimerId("gcTimer")
>>                                              private final TimerSpec 
>> gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>> 
>>                                              @ProcessElement
>>                                              public void process(
>>                                                      ProcessContext c,
>>                                                      @Timestamp Instant ts,
>>                                                      @StateId("state") 
>> ValueState<Long> state,
>>                                                      @TimerId("gcTimer") 
>> Timer gcTimer
>>                                              ) {
>>                                                  
>> state.write(c.element().getValue());
>>                                                  Instant expirationTime = 
>> new Instant(ts.getMillis()).plus(Duration.standardSeconds(5));
>>                                                  gcTimer.set(expirationTime);
>>                                              }
>> 
>>                                              @OnTimer("gcTimer")
>>                                              public void 
>> onGC(@StateId("state") ValueState<Long> state ) {
>>                                                  state.clear();
>>                                              }
>>                                          }
>> 
>>                 ));
>> 
>>         p.run().waitUntilFinish();
>> 
>> After running the application a couple of minutes with FlinkRunner heap 
>> memory increases periodically. When I checked the heap dump I can see that 
>> there are many timers that is not being collected by GC. 
>> When I test it with fixed or sliding windows I can see the timers cleared at 
>> the end of the window. Is this the expected behavior because Global Windows 
>> never closes? Is there a way to clear the timers in global windows or should 
>> we try to use sliding windows with deduplications? Session windows can also 
>> work for our scenario but stateful fn do not support it.  
>> 
>> This was my first message here sorry for any mistakes.
>> 
>> Thank you all very much
>> Gokhan Imral
>> 
>> <Screen Shot 2020-09-02 at 11.36.15 AM.png><Screen Shot 2020-09-02 at 
>> 12.21.22 PM.png>

Reply via email to