Hi,
We are running a Beam application on Flink runner (Beam 2.29 and Flink
1.12) which reads from Kafka and writes to S3 once every 5 minutes. My window
and s3 writes looks like
PCollection<GenericRecord>.apply("Batch Events", Window.<GenericRecord>into(
FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes())
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema))
.to(outputPath)
.withNumShards(5)
.withNaming(new
CustomFileNaming("snappy.parquet")));
Resources allocated: 5 task slots each with 3 CPU and 32 GB RAM. We are using
RocksDB as state backend and giving 50% of memory to off-heap.
Its running fine with lighter loads. But when it gets heavier load from Kafka
(7500 or more records per sec – each record around 7KB in size), we are seeing
that no files are being written to S3.We are using
AfterWatermark.pastEndOfWindow() which is trigerring only when the watermark
reaches the end of window.
After debugging we found that watermarks are not being advanced during heavy
loads and as a result event time triggers after watermark reaches end of window
because of which s3 writes will happen are not getting triggered. So the data
is accumulating in off-heap which results in out of memory after some time.
Can you please let us know why watermarks are not advancing under high load.
Thanks,
Sandeep