Hi Jan,
           We are not adding any custom timestamp policy. We also don’t see 
backpressure in Flink UI. We are giving 5 task slots each with 3 CPU and 32 GB 
RAM. Its working if we give 10 task slots each with 3 CPU and 32 GB RAM. But 
that’s lot of resources for this load. We are trying to figure out why Beam is 
not able to handle 10,000 records per second with 5 task slots.

Thanks,
Sandeep

From: Jan Lukavský <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Tuesday, November 16, 2021 at 3:11 AM
To: "[email protected]" <[email protected]>
Subject: Re: Beam on Flink runner not able to advance watermarks on a high load

This email is from an external sender.


Hi Sandeep,

- dev@beam<mailto:[email protected]>

The watermark estimation itself should not be related to load. Can you please 
clarify, if

 a) you are using any custom timestamp policy?

 b) you see any backpressure in Flink's UI? Backpressure could - under some 
circumstances - cause delays in watermark propagation. It _might_ help to 
increase parallelism in that case.

Best,

 Jan
On 11/15/21 18:22, Kathula, Sandeep wrote:
Hi,
    We are running a Beam application on Flink runner (Beam 2.29 and Flink 
1.12) which reads from Kafka and writes to S3  once every 5 minutes. My window 
and s3 writes looks like

PCollection<GenericRecord>.apply("Batch Events", Window.<GenericRecord>into(
                            FixedWindows.of(Duration.standardMinutes(5)))
                            .triggering(AfterWatermark.pastEndOfWindow())
                            .withAllowedLateness(Duration.ZERO, 
Window.ClosingBehavior.FIRE_ALWAYS)
                            .discardingFiredPanes())
                            .apply(FileIO.<GenericRecord>write()
                            .via(ParquetIO.sink(schema))
                            .to(outputPath)
                            .withNumShards(5)
                            .withNaming(new 
CustomFileNaming("snappy.parquet")));


Resources allocated: 5 task slots each with 3 CPU and 32 GB RAM. We are using 
RocksDB as state backend and giving 50% of memory to off-heap.

Its running fine with lighter loads. But when it gets heavier load from Kafka 
(7500 or more records per sec – each record around 7KB in size), we are seeing 
that no files are being written to S3.We are using 
AfterWatermark.pastEndOfWindow() which is trigerring only when the watermark 
reaches the end of window.



After debugging we found that watermarks are not being advanced during heavy 
loads and as a result event time triggers after watermark reaches end of window 
because of which s3 writes will happen are not getting triggered. So the data 
is accumulating in off-heap which results in out of memory after some time.


Can you please let us know why watermarks are not advancing under high load.


Thanks,
Sandeep

Reply via email to