Can you share the (relevant) parameters of the ReadFromKafka transform?

This feels strange, and it might not do what you'd expect:

| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event: window.TimestampedValue(event, time.time()))

This does not change the assigned timestamp of an element, but creates a new element which contains processing time. It will not be used for windowing, though.

On 9/19/24 00:49, Lydian Lee wrote:
Hi Reuven,

Here's a quick look for our pipeline:
(
pipeline
|"Reading message from Kafka">>ReadFromKafka(...)
| "Deserializing events" >> Deserialize(**deserializer_args)
| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event: window.TimestampedValue(event, time.time()))
| "Window into Fixed Intervals" >> beam.WindowInto(
beam.transforms.window.FixedWindows(fixed_window_size), # fixed_window_size is 1 min. allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), # although we configured lateness, but because we are using processing time, i don't expect any late events
)
| "Adding random integer partition key" >> beam.Map(
lambda event: (random.randint(1, 5), element) # add dummy key to reshuffle to less partitions.  Kafka have 16 partition, but we only want to generate 2 files every minute
)
| "Group by randomly-assigned integer key" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
| "Writing event data batches to parquet" >> WriteBatchesToS3(...) # call boto3 to write the events into S3 with parquet format
)

Thanks!


On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <user@beam.apache.org> wrote:

    How are you doing this aggregation?

    On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee
    <tingyenlee...@gmail.com> wrote:

        Hi Jan,

        Thanks for the recommendation. In our case, we are windowing
        with the processing time, which means that there should be no
        late event at all.

        You’ve mentioned that GroupByKey is stateful and can
        potentially drop the data. Given that after reshuffle (add
        random shuffle id to the key), we then do the aggregation
        (combine the data and write those data to S3.) Do you think
        the example I mentioned earlier could potentially be the
        reason for the dropping data?

        If so, in general how does Beam being able to prevent that ?
        Are there any suggested approaches? Thanks

        On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský
        <je...@seznam.cz> wrote:

            Hi Lydian,

            in that case, there is only a generic advice you can look
            into. Reshuffle is a stateless operation that should not
            cause dropping data. A GroupByKey on the other hand is
            stateful and thus can - when dealing with late data - drop
            some of them. You should be able to confirm this looking
            for 'droppedDueToLateness' counter and/or log in here [1].
            This happens when elements arrive after watermark passes
            element's timestamp minus allowed lateness. If you see the
            log, you might need to either change how you assign
            timestamps to elements (e.g. use log append time) or
            increase allowed lateness of your windowfn.

            Best,

             Jan

            [1]
            
https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132

            On 9/18/24 08:53, Lydian Lee wrote:
            I would love to, but there are some limitations on our
            ends that the version bump won’t be happened soon. Thus I
            need to figure out what might be the root cause though.


            On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský
            <je...@seznam.cz> wrote:

                Hi Lydian,

                2.41.0 is quite old, can you please try current
                version to see if this issue is still present? There
                were lots of changes between 2.41.0 and 2.59.0.

                 Jan

                On 9/17/24 17:49, Lydian Lee wrote:
                Hi,

                We are using Beam Python SDK with Flink Runner, the
                Beam version is 2.41.0 and the Flink version is 1.15.4.

                We have a pipeline that has 2 stages:
                1. read from kafka and fixed window for every 1 minute
                2. aggregate the data for the past 1 minute and
                reshuffle so that we have less partition count and
                write them into s3.

                We disabled the enable.auto.commit and enabled
                commit_offset_in_finalize. also the
                auto.offset.reset is set to "latest"
                image.png

                According to the log, I can definitely find the data
                is consuming from Kafka Offset, Because there are many
                ```
                Resetting offset for topic XXXX-<PARTITION>  to
                offset <OFFSET>
                ```
                and that partition/offset pair does match the
                missing records. However, it doesn't show up in the
                final S3.

                My current hypothesis is that the shuffling might be
                the reason for the issue, for example, originally in
                kafka for the past minute in partition 1,  I have
                offset 1, 2, 3 records. After reshuffle, it now
                distribute, for example:
                - partition A: 1, 3
                - partition B: 2

                And if partition A is done successfully but
                partition B fails. Given that A is succeeded, it
                will commit its offset to Kafka, and thus kafka now
                has an offset to 3.  And when kafka retries , it
                will skip the offset 2.   However, I am not sure how
                exactly the offset commit works, wondering how it
                interacts with the checkpoints.  But it does seem
                like if my hypothesis is correct, we should be
                seeing more missing records, however, this seems
                rare to happen.  Wondering if anyone can help
                identify potential root causes?  Thanks



Reply via email to