Hi Sandeep,
- dev@beam <mailto:[email protected]>
The watermark estimation itself should not be related to load. Can you
please clarify, if
a) you are using any custom timestamp policy?
b) you see any backpressure in Flink's UI? Backpressure could - under
some circumstances - cause delays in watermark propagation. It _might_
help to increase parallelism in that case.
Best,
Jan
On 11/15/21 18:22, Kathula, Sandeep wrote:
Hi,
We are running a Beam application on Flink runner (Beam 2.29 and
Flink 1.12) which reads from Kafka and writes to S3 once every 5
minutes. My window and s3 writes looks like
PCollection<GenericRecord>.apply("Batch Events",
Window.<GenericRecord>into(
FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes())
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema))
.to(outputPath)
.withNumShards(5)
.withNaming(new CustomFileNaming("snappy.parquet")));
Resources allocated: 5 task slots each with 3 CPU and 32 GB RAM. We
are using RocksDB as state backend and giving 50% of memory to off-heap.
Its running fine with lighter loads. But when it gets heavier load
from Kafka (7500 or more records per sec – each record around 7KB in
size), we are seeing that no files are being written to S3.We are
using AfterWatermark.pastEndOfWindow() which is trigerring only when
the watermark reaches the end of window.
After debugging we found that watermarks are not being advanced during
heavy loads and as a result event time triggers after watermark
reaches end of window because of which s3 writes will happen are not
getting triggered. So the data is accumulating in off-heap which
results in out of memory after some time.
Can you please let us know why watermarks are not advancing under high
load.
Thanks,
Sandeep