How are you doing this aggregation? On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com> wrote:
> Hi Jan, > > Thanks for the recommendation. In our case, we are windowing with the > processing time, which means that there should be no late event at all. > > You’ve mentioned that GroupByKey is stateful and can potentially drop the > data. Given that after reshuffle (add random shuffle id to the key), we > then do the aggregation (combine the data and write those data to S3.) Do > you think the example I mentioned earlier could potentially be the reason > for the dropping data? > > If so, in general how does Beam being able to prevent that ? Are there any > suggested approaches? Thanks > > On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Lydian, >> >> in that case, there is only a generic advice you can look into. Reshuffle >> is a stateless operation that should not cause dropping data. A GroupByKey >> on the other hand is stateful and thus can - when dealing with late data - >> drop some of them. You should be able to confirm this looking for >> 'droppedDueToLateness' counter and/or log in here [1]. This happens when >> elements arrive after watermark passes element's timestamp minus allowed >> lateness. If you see the log, you might need to either change how you >> assign timestamps to elements (e.g. use log append time) or increase >> allowed lateness of your windowfn. >> >> Best, >> >> Jan >> >> [1] >> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132 >> On 9/18/24 08:53, Lydian Lee wrote: >> >> I would love to, but there are some limitations on our ends that the >> version bump won’t be happened soon. Thus I need to figure out what might >> be the root cause though. >> >> >> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Lydian, >>> >>> 2.41.0 is quite old, can you please try current version to see if this >>> issue is still present? There were lots of changes between 2.41.0 and >>> 2.59.0. >>> >>> Jan >>> On 9/17/24 17:49, Lydian Lee wrote: >>> >>> Hi, >>> >>> We are using Beam Python SDK with Flink Runner, the Beam version is >>> 2.41.0 and the Flink version is 1.15.4. >>> >>> We have a pipeline that has 2 stages: >>> 1. read from kafka and fixed window for every 1 minute >>> 2. aggregate the data for the past 1 minute and reshuffle so that we >>> have less partition count and write them into s3. >>> >>> We disabled the enable.auto.commit and enabled >>> commit_offset_in_finalize. also the auto.offset.reset is set to "latest" >>> [image: image.png] >>> >>> According to the log, I can definitely find the data is consuming from >>> Kafka Offset, Because there are many >>> ``` >>> Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET> >>> ``` >>> and that partition/offset pair does match the missing records. However, >>> it doesn't show up in the final S3. >>> >>> My current hypothesis is that the shuffling might be the reason for the >>> issue, for example, originally in kafka for the past minute in partition >>> 1, I have offset 1, 2, 3 records. After reshuffle, it now distribute, for >>> example: >>> - partition A: 1, 3 >>> - partition B: 2 >>> >>> And if partition A is done successfully but partition B fails. Given >>> that A is succeeded, it will commit its offset to Kafka, and thus kafka now >>> has an offset to 3. And when kafka retries , it will skip the offset 2. >>> However, I am not sure how exactly the offset commit works, wondering how >>> it interacts with the checkpoints. But it does seem like if my hypothesis >>> is correct, we should be seeing more missing records, however, this seems >>> rare to happen. Wondering if anyone can help identify potential >>> root causes? Thanks >>> >>> >>> >>> >>>