Hi Lydian,

because you do not specify 'timestamp_policy' it should use the default, which should be processing time, so this should not be the issue. The one potentially left transform is the sink transform, as Reuven mentioned. Can you share details of the implementation?

 Jan

On 9/19/24 23:10, Lydian Lee wrote:
Hi, Jan

Here's how we do ReadFromKafka, the expansion service is just to ensure we can work with xlang in k8s, so please ignore them.
from apache_beam.io.kafka import default_io_expansion_service
ReadFromKafka(
consumer_config={
"group.id <http://group.id>": "group-name",
"auto.offset.reset": "latest",
"enable.auto.commit": "false",
},
topics="topic-name",
with_metadata=False,
expansion_service=default_io_expansion_service(
append_args=[
f"--defaultEnvironmentType=PROCESS",
f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}',
"--experiments=use_deprecated_read",
]
),
commit_offset_in_finalize=True,
)

Do you know what would be the right approach for using processing time instead? I thought the WindowInto supposed to use the timestamp we appened to the event?  Do you think it is still using the original Kafka event timestamp?  Thanks!



On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <je...@seznam.cz> wrote:

    Can you share the (relevant) parameters of the ReadFromKafka
    transform?

    This feels strange, and it might not do what you'd expect:

    | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
    event: window.TimestampedValue(event, time.time()))

    This does not change the assigned timestamp of an element, but
    creates a new element which contains processing time. It will not
    be used for windowing, though.
    On 9/19/24 00:49, Lydian Lee wrote:
    Hi Reuven,

    Here's a quick look for our pipeline:
    (
    pipeline
    |"Reading message from Kafka">>ReadFromKafka(...)
    | "Deserializing events" >> Deserialize(**deserializer_args)
    | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
    event: window.TimestampedValue(event, time.time()))
    | "Window into Fixed Intervals" >> beam.WindowInto(
    beam.transforms.window.FixedWindows(fixed_window_size), #
    fixed_window_size is 1 min.
    allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness),
    # although we configured lateness, but because we are using
    processing time, i don't expect any late events
    )
    | "Adding random integer partition key" >> beam.Map(
    lambda event: (random.randint(1, 5), element) # add dummy key to
    reshuffle to less partitions.  Kafka have 16 partition, but we
    only want to generate 2 files every minute
    )
    | "Group by randomly-assigned integer key" >> beam.GroupByKey()
    | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
    | "Writing event data batches to parquet"
    >> WriteBatchesToS3(...) # call boto3 to write the events into S3
    with parquet format
    )

    Thanks!


    On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user
    <user@beam.apache.org> wrote:

        How are you doing this aggregation?

        On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee
        <tingyenlee...@gmail.com> wrote:

            Hi Jan,

            Thanks for the recommendation. In our case, we are
            windowing with the processing time, which means that
            there should be no late event at all.

            You’ve mentioned that GroupByKey is stateful and can
            potentially drop the data. Given that after reshuffle
            (add random shuffle id to the key), we then do the
            aggregation (combine the data and write those data to
            S3.) Do you think the example I mentioned earlier could
            potentially be the reason for the dropping data?

            If so, in general how does Beam being able to prevent
            that ? Are there any suggested approaches? Thanks

            On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský
            <je...@seznam.cz> wrote:

                Hi Lydian,

                in that case, there is only a generic advice you can
                look into. Reshuffle is a stateless operation that
                should not cause dropping data. A GroupByKey on the
                other hand is stateful and thus can - when dealing
                with late data - drop some of them. You should be
                able to confirm this looking for
                'droppedDueToLateness' counter and/or log in here
                [1]. This happens when elements arrive after
                watermark passes element's timestamp minus allowed
                lateness. If you see the log, you might need to
                either change how you assign timestamps to elements
                (e.g. use log append time) or increase allowed
                lateness of your windowfn.

                Best,

                 Jan

                [1]
                
https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132

                On 9/18/24 08:53, Lydian Lee wrote:
                I would love to, but there are some limitations on
                our ends that the version bump won’t be happened
                soon. Thus I need to figure out what might be the
                root cause though.


                On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský
                <je...@seznam.cz> wrote:

                    Hi Lydian,

                    2.41.0 is quite old, can you please try current
                    version to see if this issue is still present?
                    There were lots of changes between 2.41.0 and
                    2.59.0.

                     Jan

                    On 9/17/24 17:49, Lydian Lee wrote:
                    Hi,

                    We are using Beam Python SDK with Flink Runner,
                    the Beam version is 2.41.0 and the Flink
                    version is 1.15.4.

                    We have a pipeline that has 2 stages:
                    1. read from kafka and fixed window for every 1
                    minute
                    2. aggregate the data for the past 1 minute and
                    reshuffle so that we have less partition count
                    and write them into s3.

                    We disabled the enable.auto.commit and enabled
                    commit_offset_in_finalize. also the
                    auto.offset.reset is set to "latest"
                    image.png

                    According to the log, I can definitely find the
                    data is consuming from Kafka Offset, Because
                    there are many
                    ```
                    Resetting offset for topic XXXX-<PARTITION> to
                    offset <OFFSET>
                    ```
                    and that partition/offset pair does match the
                    missing records.  However, it doesn't show up
                    in the final S3.

                    My current hypothesis is that the shuffling
                    might be the reason for the issue, for example,
                    originally in kafka for the past minute in
                    partition 1,  I have offset 1, 2, 3 records.
                    After reshuffle, it now distribute, for example:
                    - partition A: 1, 3
                    - partition B: 2

                    And if partition A is done successfully but
                    partition B fails. Given that A is succeeded,
                    it will commit its offset to Kafka, and thus
                    kafka now has an offset to 3. And when kafka
                    retries , it will skip the offset 2.   However,
                    I am not sure how exactly the offset commit
                    works, wondering how it interacts with the
                    checkpoints.  But it does seem like if my
                    hypothesis is correct, we should be seeing more
                    missing records, however, this seems rare to
                    happen.  Wondering if anyone can help identify
                    potential root causes? Thanks



Reply via email to