Hello everyone,

We are trying to run a beam pipeline on flink on kubernetes in aws and have a couple of questions.

Our Setup:
1. Runner: Flink
2. Data Source: Reading data via HCatalogIO against a Parquet table.
3. We read historical data from S3 and once we are done reading the bounded data source, we read data from a real time stream. Our pipeline as a whole is astreaming pipeline reading both a bounded data source and an unbounded data source.

1. While using a GlobalWindow with a Combine.perKey operation, we notice in our heap dumps that memory gets retained by org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper even after we are done reading the entire bounded source. Here is the complete window + trigger that we are using.
.apply("Apply windowing and triggering",
           Window.<KV<String, BillUnifiedProjection>>configure()
               .accumulatingFiredPanes()
               .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                   .plusDelayOf(Duration.standardSeconds(30))
               )))

2. However, if we use FixedWindows instead, we don't see memory being retained by UnboundedSourceWrapper after its done reading the bounded source data.

I have a 2 part question:
1)
  • Global Windows - When using beam global windows, would the source data remain in memory for the entire duration of a streaming job [forever]?
  • Fixed Windows - Would using Fixed Windows only retain the source data for the duration of the window and would then be garbage collected?
2) We see OOM exceptions eventually with our job and after analyzing the thread dumps, looks like retaining the sources would take up a lot of memory. I would really appreciate if someone could help me understand Global v/s Fixed windows in the context of a combiner and how would that affect the memory foot print.

Thank you very much.

Best,
Ankit

Reply via email to