Hi Sandeep,
one more question, did you try to use --experiments=use_deprecated_read?
If not, can you try that and check if it has any impact on the behavior
you observe?
Jan
On 11/18/21 01:41, Kathula, Sandeep wrote:
Hi Jan,
We are not adding any custom timestamp policy. We also don’t see
backpressure in Flink UI. We are giving 5 task slots each with 3 CPU
and 32 GB RAM. Its working if we give 10 task slots each with 3 CPU
and 32 GB RAM. But that’s lot of resources for this load. We are
trying to figure out why Beam is not able to handle 10,000 records per
second with 5 task slots.
Thanks,
Sandeep
*From: *Jan Lukavský <[email protected]>
*Reply-To: *"[email protected]" <[email protected]>
*Date: *Tuesday, November 16, 2021 at 3:11 AM
*To: *"[email protected]" <[email protected]>
*Subject: *Re: Beam on Flink runner not able to advance watermarks on
a high load
This email is from an external sender.
Hi Sandeep,
- dev@beam <mailto:[email protected]>
The watermark estimation itself should not be related to load. Can you
please clarify, if
a) you are using any custom timestamp policy?
b) you see any backpressure in Flink's UI? Backpressure could - under
some circumstances - cause delays in watermark propagation. It _might_
help to increase parallelism in that case.
Best,
Jan
On 11/15/21 18:22, Kathula, Sandeep wrote:
Hi,
We are running a Beam application on Flink runner (Beam 2.29
and Flink 1.12) which reads from Kafka and writes to S3 once
every 5 minutes. My window and s3 writes looks like
PCollection<GenericRecord>.apply("Batch Events",
Window.<GenericRecord>into(
FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes())
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema))
.to(outputPath)
.withNumShards(5)
.withNaming(new CustomFileNaming("snappy.parquet")));
Resources allocated: 5 task slots each with 3 CPU and 32 GB RAM.
We are using RocksDB as state backend and giving 50% of memory to
off-heap.
Its running fine with lighter loads. But when it gets heavier load
from Kafka (7500 or more records per sec – each record around 7KB
in size), we are seeing that no files are being written to S3.We
are using AfterWatermark.pastEndOfWindow() which is trigerring
only when the watermark reaches the end of window.
After debugging we found that watermarks are not being advanced
during heavy loads and as a result event time triggers after
watermark reaches end of window because of which s3 writes will
happen are not getting triggered. So the data is accumulating in
off-heap which results in out of memory after some time.
Can you please let us know why watermarks are not advancing under
high load.
Thanks,
Sandeep