Hi Sandeep,

if you are certain, that the timestamps of the messages in Kafka topic you are processing are correctly generated (which probably should be, if adding more resources 'fixes' the issue), I suggest you try to create a flame graph, or any other form of aggregated view over hotspots in thread dump. You might want to as well verify, that your pipeline processes records from all partitions of your topic(s), so that no partition is accidentally left-behind. But that is all that comes to my mind, currently. If you'll have any more insights, do not hesitate to share, so that the community can help figure this out, because that behavior 'feels' unexpected - at least to me.

Best,

 Jan

On 11/29/21 19:45, Kathula, Sandeep wrote:

Hi Jan,

          We are already using --experiments=use_deprecated_read and Flink is not advancing the watermarks.

Thanks,

Sandeep

*From: *Jan Lukavský <[email protected]>
*Date: *Thursday, November 18, 2021 at 4:03 AM
*To: *"Kathula, Sandeep" <[email protected]>, "[email protected]" <[email protected]> *Subject: *Re: Beam on Flink runner not able to advance watermarks on a high load

This email is from an external sender.

Hi Sandeep,

one more question, did you try to use --experiments=use_deprecated_read? If not, can you try that and check if it has any impact on the behavior you observe?

 Jan

On 11/18/21 01:41, Kathula, Sandeep wrote:

    Hi Jan,

    We are not adding any custom timestamp policy. We also don’t see
    backpressure in Flink UI. We are giving 5 task slots each with 3
    CPU and 32 GB RAM. Its working if we give 10 task slots each with
    3 CPU and 32 GB RAM. But that’s lot of resources for this load. We
    are trying to figure out why Beam is not able to handle 10,000
    records per second with 5 task slots.

    Thanks,

    Sandeep

    *From: *Jan Lukavský <[email protected]> <mailto:[email protected]>
    *Reply-To: *"[email protected]" <mailto:[email protected]>
    <[email protected]> <mailto:[email protected]>
    *Date: *Tuesday, November 16, 2021 at 3:11 AM
    *To: *"[email protected]" <mailto:[email protected]>
    <[email protected]> <mailto:[email protected]>
    *Subject: *Re: Beam on Flink runner not able to advance watermarks
    on a high load

    This email is from an external sender.

    Hi Sandeep,

    - dev@beam <mailto:[email protected]>

    The watermark estimation itself should not be related to load. Can
    you please clarify, if

     a) you are using any custom timestamp policy?

     b) you see any backpressure in Flink's UI? Backpressure could -
    under some circumstances - cause delays in watermark propagation.
    It _might_ help to increase parallelism in that case.

    Best,

     Jan

    On 11/15/21 18:22, Kathula, Sandeep wrote:

        Hi,

        We are running a Beam application on Flink runner (Beam 2.29
        and Flink 1.12) which reads from Kafka and writes to S3  once
        every 5 minutes. My window and s3 writes looks like

        PCollection<GenericRecord>.apply("Batch Events",
        Window.<GenericRecord>into(

        FixedWindows.of(Duration.standardMinutes(5)))

        .triggering(AfterWatermark.pastEndOfWindow())

        .withAllowedLateness(Duration.ZERO,
        Window.ClosingBehavior.FIRE_ALWAYS)

        .discardingFiredPanes())

        .apply(FileIO.<GenericRecord>write()

        .via(ParquetIO.sink(schema))

        .to(outputPath)

        .withNumShards(5)

        .withNaming(new CustomFileNaming("snappy.parquet")));

        Resources allocated: 5 task slots each with 3 CPU and 32 GB
        RAM. We are using RocksDB as state backend and giving 50% of
        memory to off-heap.

        Its running fine with lighter loads. But when it gets heavier
        load from Kafka (7500 or more records per sec – each record
        around 7KB in size), we are seeing that no files are being
        written to S3.We are using AfterWatermark.pastEndOfWindow()
        which is trigerring only when the watermark reaches the end of
        window.

        After debugging we found that watermarks are not being
        advanced during heavy loads and as a result event time
        triggers after watermark reaches end of window because of
        which s3 writes will happen are not getting triggered. So the
        data is accumulating in off-heap which results in out of
        memory after some time.

        Can you please let us know why watermarks are not advancing
        under high load.

        Thanks,

        Sandeep

Reply via email to