Hi Lydian,

in that case, there is only a generic advice you can look into. Reshuffle is a stateless operation that should not cause dropping data. A GroupByKey on the other hand is stateful and thus can - when dealing with late data - drop some of them. You should be able to confirm this looking for 'droppedDueToLateness' counter and/or log in here [1]. This happens when elements arrive after watermark passes element's timestamp minus allowed lateness. If you see the log, you might need to either change how you assign timestamps to elements (e.g. use log append time) or increase allowed lateness of your windowfn.

Best,

 Jan

[1] https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132

On 9/18/24 08:53, Lydian Lee wrote:
I would love to, but there are some limitations on our ends that the version bump won’t be happened soon. Thus I need to figure out what might be the root cause though.


On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote:

    Hi Lydian,

    2.41.0 is quite old, can you please try current version to see if
    this issue is still present? There were lots of changes between
    2.41.0 and 2.59.0.

     Jan

    On 9/17/24 17:49, Lydian Lee wrote:
    Hi,

    We are using Beam Python SDK with Flink Runner, the Beam version
    is 2.41.0 and the Flink version is 1.15.4.

    We have a pipeline that has 2 stages:
    1. read from kafka and fixed window for every 1 minute
    2. aggregate the data for the past 1 minute and reshuffle so that
    we have less partition count and write them into s3.

    We disabled the enable.auto.commit and enabled
    commit_offset_in_finalize. also the auto.offset.reset is set to
    "latest"
    image.png

    According to the log, I can definitely find the data is consuming
    from Kafka Offset, Because there are many
    ```
    Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET>
    ```
    and that partition/offset pair does match the missing records. 
    However, it doesn't show up in the final S3.

    My current hypothesis is that the shuffling might be the reason
    for the issue, for example, originally in kafka for the past
    minute in partition 1,  I have offset 1, 2, 3 records. After
    reshuffle, it now distribute, for example:
    - partition A: 1, 3
    - partition B: 2

    And if partition A is done successfully but partition B fails.
    Given that A is succeeded, it will commit its offset to Kafka,
    and thus kafka now has an offset to 3.  And when kafka retries ,
    it will skip the offset 2.   However, I am not sure how exactly
    the offset commit works, wondering how it interacts with the
    checkpoints.  But it does seem like if my hypothesis is correct,
    we should be seeing more missing records, however, this seems
    rare to happen.  Wondering if anyone can help identify potential
    root causes?  Thanks



Reply via email to