Thanks for reporting Gökhan! Please keep us updated. We'll likely merge
the patch by the end of the week.
-Max
On 03.09.20 08:40, Gökhan Imral wrote:
Thanks for the quick response. I tried with a fix applied build and can
see that memory is much more stable.
Gokhan
On 2 Sep 2020, at 12:51 PM, Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Gokhan,
this is related to [1], which is just going to be fixed.
Jan
[1] https://github.com/apache/beam/pull/12733
On 9/2/20 10:24 AM, Gökhan Imral wrote:
Hi,
We have implemented a unbounded stream join using stateful DoFn and a
global window by storing the elements when necessary in the state and
using timers to clear the state when it should expire. But after
deploying our job we have discovered that the state size is growing
all the time.While experimenting to find out the cause, I have
discovered that the timers are not getting garbage collected.
This is the sample application I used for testing.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(GenerateSequence.from(0).withRate(10000,
Duration.standardSeconds(1)))
.apply(Filter.by((val) -> val < 1000000 || val % 500
== 0))
.apply(WithKeys.of((val) ->
val)).setCoder(KvCoder.of(VarLongCoder.of(),VarLongCoder.of()))
.apply("Window", Window.<KV<Long, Long>>into(new
GlobalWindows())
.triggering(AfterPane.elementCountAtLeast(1))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply("State", ParDo.of(new DoFn<KV<Long, Long>,
KV<Long, Long>>() {
@StateId("state")
private final
StateSpec<ValueState<Long>> stateSpec = StateSpecs.value();
@TimerId("gcTimer")
private final TimerSpec
gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext c,
@Timestamp Instant ts,
@StateId("state") ValueState<Long> state,
@TimerId("gcTimer") Timer gcTimer
) {
state.write(c.element().getValue());
Instant expirationTime = new
Instant(ts.getMillis()).plus(Duration.standardSeconds(5));
gcTimer.set(expirationTime);
}
@OnTimer("gcTimer")
public void
onGC(@StateId("state") ValueState<Long> state ) {
state.clear();
}
}
));
p.run().waitUntilFinish();
After running the application a couple of minutes with FlinkRunner
heap memory increases periodically. When I checked the heap dump I
can see that there are many timers that is not being collected by GC.
When I test it with fixed or sliding windows I can see the timers
cleared at the end of the window. Is this the expected behavior
because Global Windows never closes? Is there a way to clear the
timers in global windows or should we try to use sliding windows with
deduplications? Session windows can also work for our scenario but
stateful fn do not support it.
This was my first message here sorry for any mistakes.
Thank you all very much
Gokhan Imral
<Screen Shot 2020-09-02 at 11.36.15 AM.png><Screen Shot 2020-09-02 at
12.21.22 PM.png>