How are you doing this aggregation?

On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com> wrote:

> Hi Jan,
>
> Thanks for the recommendation. In our case, we are windowing with the
> processing time, which means that there should be no late event at all.
>
> You’ve mentioned that GroupByKey is stateful and can potentially drop the
> data. Given that after reshuffle (add random shuffle id to the key), we
> then do the aggregation (combine the data and write those data to S3.) Do
> you think the example I mentioned earlier could potentially be the reason
> for the dropping data?
>
> If so, in general how does Beam being able to prevent that ? Are there any
> suggested approaches? Thanks
>
> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Lydian,
>>
>> in that case, there is only a generic advice you can look into. Reshuffle
>> is a stateless operation that should not cause dropping data. A GroupByKey
>> on the other hand is stateful and thus can - when dealing with late data -
>> drop some of them. You should be able to confirm this looking for
>> 'droppedDueToLateness' counter and/or log in here [1]. This happens when
>> elements arrive after watermark passes element's timestamp minus allowed
>> lateness. If you see the log, you might need to either change how you
>> assign timestamps to elements (e.g. use log append time) or increase
>> allowed lateness of your windowfn.
>>
>> Best,
>>
>>  Jan
>>
>> [1]
>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
>> On 9/18/24 08:53, Lydian Lee wrote:
>>
>> I would love to, but there are some limitations on our ends that the
>> version bump won’t be happened soon. Thus I need to figure out what might
>> be the root cause though.
>>
>>
>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Lydian,
>>>
>>> 2.41.0 is quite old, can you please try current version to see if this
>>> issue is still present? There were lots of changes between 2.41.0 and
>>> 2.59.0.
>>>
>>>  Jan
>>> On 9/17/24 17:49, Lydian Lee wrote:
>>>
>>> Hi,
>>>
>>> We are using Beam Python SDK with Flink Runner, the Beam version is
>>> 2.41.0 and the Flink version is 1.15.4.
>>>
>>> We have a pipeline that has 2 stages:
>>> 1. read from kafka and fixed window for every 1 minute
>>> 2. aggregate the data for the past 1 minute and reshuffle so that we
>>> have less partition count and write them into s3.
>>>
>>> We disabled the enable.auto.commit and enabled
>>> commit_offset_in_finalize. also the auto.offset.reset is set to "latest"
>>> [image: image.png]
>>>
>>> According to the log, I can definitely find the data is consuming from
>>> Kafka Offset, Because there are many
>>> ```
>>> Resetting offset for topic XXXX-<PARTITION>  to offset <OFFSET>
>>> ```
>>> and that partition/offset pair does match the missing records.  However,
>>> it doesn't show up in the final S3.
>>>
>>> My current hypothesis is that the shuffling might be the reason for the
>>> issue, for example, originally in kafka for the past minute in partition
>>> 1,  I have offset 1, 2, 3 records. After reshuffle, it now distribute, for
>>> example:
>>> - partition A: 1, 3
>>> - partition B: 2
>>>
>>> And if partition A is done successfully but partition B fails. Given
>>> that A is succeeded, it will commit its offset to Kafka, and thus kafka now
>>> has an offset to 3.  And when kafka retries , it will skip the offset 2.
>>>  However, I am not sure how exactly the offset commit works, wondering how
>>> it interacts with the checkpoints.  But it does seem like if my hypothesis
>>> is correct, we should be seeing more missing records, however, this seems
>>> rare to happen.  Wondering if anyone can help identify potential
>>> root causes?  Thanks
>>>
>>>
>>>
>>>
>>>

Reply via email to