Thanks for the quick response. I tried with a fix applied build and can see that memory is much more stable.
Gokhan > On 2 Sep 2020, at 12:51 PM, Jan Lukavský <[email protected]> wrote: > > Hi Gokhan, > > this is related to [1], which is just going to be fixed. > > Jan > > [1] https://github.com/apache/beam/pull/12733 > <https://github.com/apache/beam/pull/12733> > On 9/2/20 10:24 AM, Gökhan Imral wrote: >> Hi, >> >> We have implemented a unbounded stream join using stateful DoFn and a global >> window by storing the elements when necessary in the state and using timers >> to clear the state when it should expire. But after deploying our job we >> have discovered that the state size is growing all the time.While >> experimenting to find out the cause, I have discovered that the timers are >> not getting garbage collected. >> >> This is the sample application I used for testing. >> >> PipelineOptions options = >> PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); >> Pipeline p = Pipeline.create(options); >> p.apply(GenerateSequence.from(0).withRate(10000, >> Duration.standardSeconds(1))) >> .apply(Filter.by((val) -> val < 1000000 || val % 500 == 0)) >> .apply(WithKeys.of((val) -> >> val)).setCoder(KvCoder.of(VarLongCoder.of(),VarLongCoder.of())) >> .apply("Window", Window.<KV<Long, Long>>into(new >> GlobalWindows()) >> .triggering(AfterPane.elementCountAtLeast(1)) >> .discardingFiredPanes() >> .withAllowedLateness(Duration.ZERO)) >> .apply("State", ParDo.of(new DoFn<KV<Long, Long>, KV<Long, >> Long>>() { >> @StateId("state") >> private final >> StateSpec<ValueState<Long>> stateSpec = StateSpecs.value(); >> @TimerId("gcTimer") >> private final TimerSpec >> gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); >> >> @ProcessElement >> public void process( >> ProcessContext c, >> @Timestamp Instant ts, >> @StateId("state") >> ValueState<Long> state, >> @TimerId("gcTimer") >> Timer gcTimer >> ) { >> >> state.write(c.element().getValue()); >> Instant expirationTime = >> new Instant(ts.getMillis()).plus(Duration.standardSeconds(5)); >> gcTimer.set(expirationTime); >> } >> >> @OnTimer("gcTimer") >> public void >> onGC(@StateId("state") ValueState<Long> state ) { >> state.clear(); >> } >> } >> >> )); >> >> p.run().waitUntilFinish(); >> >> After running the application a couple of minutes with FlinkRunner heap >> memory increases periodically. When I checked the heap dump I can see that >> there are many timers that is not being collected by GC. >> When I test it with fixed or sliding windows I can see the timers cleared at >> the end of the window. Is this the expected behavior because Global Windows >> never closes? Is there a way to clear the timers in global windows or should >> we try to use sliding windows with deduplications? Session windows can also >> work for our scenario but stateful fn do not support it. >> >> This was my first message here sorry for any mistakes. >> >> Thank you all very much >> Gokhan Imral >> >> <Screen Shot 2020-09-02 at 11.36.15 AM.png><Screen Shot 2020-09-02 at >> 12.21.22 PM.png>
