Hi Reuven,

Here's a quick look for our pipeline:
(
pipeline
| "Reading message from Kafka" >> ReadFromKafka(...)
| "Deserializing events" >> Deserialize(**deserializer_args)
| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event:
window.TimestampedValue(event, time.time()))
| "Window into Fixed Intervals" >> beam.WindowInto(
beam.transforms.window.FixedWindows(fixed_window_size), # fixed_window_size
is 1 min.
allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), #
although we configured lateness, but because we are using processing time,
i don't expect any late events
)
| "Adding random integer partition key" >> beam.Map(
lambda event: (random.randint(1, 5), element) # add dummy key to reshuffle
to less partitions.  Kafka have 16 partition, but we only want to generate
2 files every minute
)
| "Group by randomly-assigned integer key" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
| "Writing event data batches to parquet" >> WriteBatchesToS3(...)  # call
boto3 to write the events into S3 with parquet format
)

Thanks!


On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <user@beam.apache.org>
wrote:

> How are you doing this aggregation?
>
> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com>
> wrote:
>
>> Hi Jan,
>>
>> Thanks for the recommendation. In our case, we are windowing with the
>> processing time, which means that there should be no late event at all.
>>
>> You’ve mentioned that GroupByKey is stateful and can potentially drop the
>> data. Given that after reshuffle (add random shuffle id to the key), we
>> then do the aggregation (combine the data and write those data to S3.) Do
>> you think the example I mentioned earlier could potentially be the reason
>> for the dropping data?
>>
>> If so, in general how does Beam being able to prevent that ? Are there
>> any suggested approaches? Thanks
>>
>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Lydian,
>>>
>>> in that case, there is only a generic advice you can look into.
>>> Reshuffle is a stateless operation that should not cause dropping data. A
>>> GroupByKey on the other hand is stateful and thus can - when dealing with
>>> late data - drop some of them. You should be able to confirm this looking
>>> for 'droppedDueToLateness' counter and/or log in here [1]. This happens
>>> when elements arrive after watermark passes element's timestamp minus
>>> allowed lateness. If you see the log, you might need to either change how
>>> you assign timestamps to elements (e.g. use log append time) or increase
>>> allowed lateness of your windowfn.
>>>
>>> Best,
>>>
>>>  Jan
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
>>> On 9/18/24 08:53, Lydian Lee wrote:
>>>
>>> I would love to, but there are some limitations on our ends that the
>>> version bump won’t be happened soon. Thus I need to figure out what might
>>> be the root cause though.
>>>
>>>
>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Lydian,
>>>>
>>>> 2.41.0 is quite old, can you please try current version to see if this
>>>> issue is still present? There were lots of changes between 2.41.0 and
>>>> 2.59.0.
>>>>
>>>>  Jan
>>>> On 9/17/24 17:49, Lydian Lee wrote:
>>>>
>>>> Hi,
>>>>
>>>> We are using Beam Python SDK with Flink Runner, the Beam version is
>>>> 2.41.0 and the Flink version is 1.15.4.
>>>>
>>>> We have a pipeline that has 2 stages:
>>>> 1. read from kafka and fixed window for every 1 minute
>>>> 2. aggregate the data for the past 1 minute and reshuffle so that we
>>>> have less partition count and write them into s3.
>>>>
>>>> We disabled the enable.auto.commit and enabled
>>>> commit_offset_in_finalize. also the auto.offset.reset is set to "latest"
>>>> [image: image.png]
>>>>
>>>> According to the log, I can definitely find the data is consuming from
>>>> Kafka Offset, Because there are many
>>>> ```
>>>> Resetting offset for topic XXXX-<PARTITION>  to offset <OFFSET>
>>>> ```
>>>> and that partition/offset pair does match the missing records.
>>>> However, it doesn't show up in the final S3.
>>>>
>>>> My current hypothesis is that the shuffling might be the reason for the
>>>> issue, for example, originally in kafka for the past minute in partition
>>>> 1,  I have offset 1, 2, 3 records. After reshuffle, it now distribute, for
>>>> example:
>>>> - partition A: 1, 3
>>>> - partition B: 2
>>>>
>>>> And if partition A is done successfully but partition B fails. Given
>>>> that A is succeeded, it will commit its offset to Kafka, and thus kafka now
>>>> has an offset to 3.  And when kafka retries , it will skip the offset 2.
>>>>  However, I am not sure how exactly the offset commit works, wondering how
>>>> it interacts with the checkpoints.  But it does seem like if my hypothesis
>>>> is correct, we should be seeing more missing records, however, this seems
>>>> rare to happen.  Wondering if anyone can help identify potential
>>>> root causes?  Thanks
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to