Hi Lydian,
in that case, there is only a generic advice you can look into.
Reshuffle is a stateless operation that should not cause dropping data.
A GroupByKey on the other hand is stateful and thus can - when dealing
with late data - drop some of them. You should be able to confirm this
looking for 'droppedDueToLateness' counter and/or log in here [1]. This
happens when elements arrive after watermark passes element's timestamp
minus allowed lateness. If you see the log, you might need to either
change how you assign timestamps to elements (e.g. use log append time)
or increase allowed lateness of your windowfn.
Best,
Jan
[1]
https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
On 9/18/24 08:53, Lydian Lee wrote:
I would love to, but there are some limitations on our ends that the
version bump won’t be happened soon. Thus I need to figure out what
might be the root cause though.
On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote:
Hi Lydian,
2.41.0 is quite old, can you please try current version to see if
this issue is still present? There were lots of changes between
2.41.0 and 2.59.0.
Jan
On 9/17/24 17:49, Lydian Lee wrote:
Hi,
We are using Beam Python SDK with Flink Runner, the Beam version
is 2.41.0 and the Flink version is 1.15.4.
We have a pipeline that has 2 stages:
1. read from kafka and fixed window for every 1 minute
2. aggregate the data for the past 1 minute and reshuffle so that
we have less partition count and write them into s3.
We disabled the enable.auto.commit and enabled
commit_offset_in_finalize. also the auto.offset.reset is set to
"latest"
image.png
According to the log, I can definitely find the data is consuming
from Kafka Offset, Because there are many
```
Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET>
```
and that partition/offset pair does match the missing records.
However, it doesn't show up in the final S3.
My current hypothesis is that the shuffling might be the reason
for the issue, for example, originally in kafka for the past
minute in partition 1, I have offset 1, 2, 3 records. After
reshuffle, it now distribute, for example:
- partition A: 1, 3
- partition B: 2
And if partition A is done successfully but partition B fails.
Given that A is succeeded, it will commit its offset to Kafka,
and thus kafka now has an offset to 3. And when kafka retries ,
it will skip the offset 2. However, I am not sure how exactly
the offset commit works, wondering how it interacts with the
checkpoints. But it does seem like if my hypothesis is correct,
we should be seeing more missing records, however, this seems
rare to happen. Wondering if anyone can help identify potential
root causes? Thanks