Hi Piotr,

answer inline.

On 9/24/24 09:53, Piotr Wiśniowski wrote:

Subject: Input on Timestamps and Late Events in Beam Pipelines

Hi team,

I’d like to contribute to this discussion as I find it quite interesting.

Regarding the element timestamps mentioned by Jan, I can confirm that it's accurate—users can reassign element timestamps in the same way described. This should be sufficient for the timestamps to be recognized by downstream aggregation. Additionally, clock synchronization issues could indeed be causing late events, as Jan suggested.

It’s also worth noting that, by default, triggers output the aggregated result when they estimate that all data for a window has arrived, discarding any subsequent data for that window (as referenced in the same documentation Jan mentioned). I noticed that while your code defines allowed lateness, it doesn't specify a trigger to handle late events. As a result, these late events will likely be ignored. You might want to consider adding a trigger to the windowing function to re-output the results when late events arrive. This could help confirm the hypothesis, though in production, it's generally better to rely on the timestamps assigned by the source rather than reassigning them, as they should already be processing timestamps.

I also have a question for the Beam developers, or anyone who might know:

Assuming that Lydian does not reassign processing timestamps but instead reassigns data timestamps (which are not directly tied to processing time), what heuristics are used to determine when to close the downstream window in stream mode? Does Beam track the minimal timestamp seen and maintain state for this? What would the time window for such a heuristic be? Or, in this case, would the pipeline behave like it does in batch mode, halting while waiting for all data to arrive? I understand that the answer likely depends on the runner—I'm particularly interested in how this works in both Dataflow and Flink.

Beam creates watermarks propagating from sources to sinks. PTransforms have two watermarks - input watermark and output watermark. Output watermark might be _hold back_ by some logic (typically buffers and/or timers). Ressigning timestamps is a stateless process, which means it does not interfere with watermark propagation and as such can without additional care cause late data. SDKs have access to "watermark hold state" by which a stateful transform can control how input watermark propagate to output watermark. But this is not (directly) exposed to users. Users can control watermark hold only through timers and their output timestamp, which seems to be sufficient under the Beam model.

Best regards,
Piotr Wiśniowski


wt., 24 wrz 2024, 08:36 użytkownik Jan Lukavský <je...@seznam.cz> napisał:

    Hi,

    I do not use Python SDK, but it seems, that - as opposed to Java
    SDK - using simple lambda returning TimestampedValue, can really
    change the timestamp of element [1]. Maybe some more experienced
    user of Python SDK can confirm this?

    Assuming this is the case, then we have two factors at play:

     a) watermarks are computed at the source transform
    (ReadFromKafka) using Java millisecond precision

     b) timestamps are later reassigned using Python's time.time()

    Both calls use system clock to compute the timestamp and thus can
    be influenced by clock synchronization (e.g. NTP). This can (at
    least in theory) cause the second call to time.time() return
    _smaller_ timestamp than the one used to compute the watermark,
    which could cause the element to become late event. If this is the
    issue, you can either increase allowed lateness, or (maybe more
    conveniently) not reassign the timestamps, because there already
    should be processing time assigned.

    Let us know if any of this works for you!
    Best,
     Jan

    [1]
    
https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

    On 9/24/24 02:09, marc hurabielle wrote:

    Hello,

    I am jumping on this, because we are doing same things as Lydian.
    In our case, we are using default timestamp strategy in kafka (so
    processing timestamp).
    We were also doing same as Lydian to add processing timestamp
    manually.


    However we have late data. It mainly happen in our integration
    test with flink. (parallelism 1), and happen really rarely in
    production.

    So it means we can't control the timestamp of an item even with
    `window.TimestampedValue(event, time.time()))`?

    Best,

    Marc



    On Tue, Sep 24, 2024, 04:23 Reuven Lax via user
    <user@beam.apache.org> wrote:

        Also as I said, the duplicate files might not appear like
        duplicates, which can be quite confusing.

        Out of curiosity, I would try - just for testing_ remove the
        line where you "add" processing time, and also set
        allowed_lateness to the largest allowed value. This will help
        determine whether late data is causing the dropped records.

        Reuven


        On Mon, Sep 23, 2024 at 2:10 PM Lydian Lee
        <tingyenlee...@gmail.com> wrote:

            Hi Jan,

            Thanks for taking a quick look!  Yes, the "with"
            statement would close after the write.  In our use cases,
            we actually don't mind if there are duplicates of data
            written, but we are more concerned about the missing
            data, which is the issue we are facing right now.

            On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user
            <user@beam.apache.org> wrote:

                Do you close the write afterwards? If not, I wonder
                if you could lose records due to unflushed data.

                Also - what you're doing here can definitely lead to
                duplicate data written, since this DoFn can be run
                multiple times. The duplicates might also appear
                different if the Iterables are slightly different on
                retries, especially in the case when Flink restarts a
                checkpoint.

                Reuven

                On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee
                <tingyenlee...@gmail.com> wrote:

                    Hi Jan,

                    Thanks so much for your help. Here's our write to
                    s3:

                    from pyarrow.parquet import ParquetWriter
                    class WriteBatchesToS3(beam.DoFn):
                    def __init__(
                    self,
                    output_path: str,
                    schema: pa.schema,
                    pipeline_options: PipelineOptions,
                    ) -> None:
                    self.output_path = output_path
                    self.schema = schema
                    self.pipeline_options = pipeline_options
                    def process(self, data: Iterable[List[Dict]]) ->
                    None:
                    """Write one batch per file to S3."""
                    client =
                    
beam.io.aws.clients.s3.boto3_client.Client(options=self.pipeline_options)
                    fields_without_metadata = [pa.field(f.name,
                    f.type) for f in self.schema]
                    schema_without_field_metadata =
                    pa.schema(fields_without_metadata)
                    filename = os.path.join(
                    self.output_path,
                    f"uuid_{str(uuid4())}.parquet",
                    )
                    tables = [pa.Table.from_pylist(batch,
                    schema=schema_without_field_metadata) for batch
                    in data]
                    if len(tables) == 0:
                    logging.info <http://logging.info>(f"No data to
                    write for key: {partition_date}, the grouped
                    contents are: {data}")
                    return
                    with
                    beam.io.aws.s3io.S3IO(client=client).open(filename=filename,
                    mode="w") as s3_writer:
                    with ParquetWriter(
                    s3_writer, schema_without_field_metadata,
                    compression="SNAPPY",
                    use_deprecated_int96_timestamps=True
                    ) as parquet_writer:
                    merged_tables = pa.concat_tables(tables)
                    parquet_writer.write_table(merged_tables)

                    On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský
                    <je...@seznam.cz> wrote:

                        Hi Lydian,

                        because you do not specify 'timestamp_policy'
                        it should use the default, which should be
                        processing time, so this should not be the
                        issue. The one potentially left transform is
                        the sink transform, as Reuven mentioned. Can
                        you share details of the implementation?

                         Jan

                        On 9/19/24 23:10, Lydian Lee wrote:
                        Hi, Jan

                        Here's how we do ReadFromKafka, the
                        expansion service is just to ensure we can
                        work with xlang in k8s, so please ignore them.
                        from apache_beam.io.kafka import
                        default_io_expansion_service
                        ReadFromKafka(
                        consumer_config={
                        "group.id <http://group.id>": "group-name",
                        "auto.offset.reset": "latest",
                        "enable.auto.commit": "false",
                        },
                        topics="topic-name",
                        with_metadata=False,
                        expansion_service=default_io_expansion_service(
                        append_args=[
                        f"--defaultEnvironmentType=PROCESS",
                        
f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}',
                        "--experiments=use_deprecated_read",
                        ]
                        ),
                        commit_offset_in_finalize=True,
                        )

                        Do you know what would be the right approach
                        for using processing time instead? I thought
                        the WindowInto supposed to use the timestamp
                        we appened to the event?  Do you think it is
                        still using the original Kafka event
                        timestamp?  Thanks!



                        On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský
                        <je...@seznam.cz> wrote:

                            Can you share the (relevant) parameters
                            of the ReadFromKafka transform?

                            This feels strange, and it might not do
                            what you'd expect:

                            | "Adding 'trigger_processing_time'
                            timestamp" >> beam.Map(lambda event:
                            window.TimestampedValue(event, time.time()))

                            This does not change the assigned
                            timestamp of an element, but creates a
                            new element which contains processing
                            time. It will not be used for windowing,
                            though.
                            On 9/19/24 00:49, Lydian Lee wrote:
                            Hi Reuven,

                            Here's a quick look for our pipeline:
                            (
                            pipeline
                            |"Reading message from
                            Kafka">>ReadFromKafka(...)
                            | "Deserializing events" >>
                            Deserialize(**deserializer_args)
                            | "Adding 'trigger_processing_time'
                            timestamp" >> beam.Map(lambda event:
                            window.TimestampedValue(event,
                            time.time()))
                            | "Window into Fixed Intervals" >>
                            beam.WindowInto(
                            
beam.transforms.window.FixedWindows(fixed_window_size),
                            # fixed_window_size is 1 min.
                            
allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness),
                            # although we configured lateness, but
                            because we are using processing time, i
                            don't expect any late events
                            )
                            | "Adding random integer partition key"
                            >> beam.Map(
                            lambda event: (random.randint(1, 5),
                            element) # add dummy key to reshuffle
                            to less partitions.  Kafka have 16
                            partition, but we only want to generate
                            2 files every minute
                            )
                            | "Group by randomly-assigned integer
                            key" >> beam.GroupByKey()
                            | "Abandon Dummy Key" >>
                            beam.MapTuple(lambda key, val: val)
                            | "Writing event data batches to
                            parquet" >> WriteBatchesToS3(...) #
                            call boto3 to write the events into S3
                            with parquet format
                            )

                            Thanks!


                            On Wed, Sep 18, 2024 at 3:16 PM Reuven
                            Lax via user <user@beam.apache.org> wrote:

                                How are you doing this aggregation?

                                On Wed, Sep 18, 2024 at 3:11 PM
                                Lydian Lee
                                <tingyenlee...@gmail.com> wrote:

                                    Hi Jan,

                                    Thanks for the recommendation.
                                    In our case, we are windowing
                                    with the processing time, which
                                    means that there should be no
                                    late event at all.

                                    You’ve mentioned that
                                    GroupByKey is stateful and can
                                    potentially drop the data.
                                    Given that after reshuffle (add
                                    random shuffle id to the key),
                                    we then do the aggregation
                                    (combine the data and write
                                    those data to S3.) Do you think
                                    the example I mentioned earlier
                                    could potentially be the reason
                                    for the dropping data?

                                    If so, in general how does Beam
                                    being able to prevent that ?
                                    Are there any suggested
                                    approaches? Thanks

                                    On Wed, Sep 18, 2024 at
                                    12:33 AM Jan Lukavský
                                    <je...@seznam.cz> wrote:

                                        Hi Lydian,

                                        in that case, there is only
                                        a generic advice you can
                                        look into. Reshuffle is a
                                        stateless operation that
                                        should not cause dropping
                                        data. A GroupByKey on the
                                        other hand is stateful and
                                        thus can - when dealing
                                        with late data - drop some
                                        of them. You should be able
                                        to confirm this looking for
                                        'droppedDueToLateness'
                                        counter and/or log in here
                                        [1]. This happens when
                                        elements arrive after
                                        watermark passes element's
                                        timestamp minus allowed
                                        lateness. If you see the
                                        log, you might need to
                                        either change how you
                                        assign timestamps to
                                        elements (e.g. use log
                                        append time) or increase
                                        allowed lateness of your
                                        windowfn.

                                        Best,

                                         Jan

                                        [1]
                                        
https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132

                                        On 9/18/24 08:53, Lydian
                                        Lee wrote:
                                        I would love to, but there
                                        are some limitations on
                                        our ends that the version
                                        bump won’t be happened
                                        soon. Thus I need to
                                        figure out what might be
                                        the root cause though.


                                        On Tue, Sep 17, 2024 at
                                        11:26 PM Jan Lukavský
                                        <je...@seznam.cz> wrote:

                                            Hi Lydian,

                                            2.41.0 is quite old,
                                            can you please try
                                            current version to see
                                            if this issue is still
                                            present? There were
                                            lots of changes
                                            between 2.41.0 and 2.59.0.

                                             Jan

                                            On 9/17/24 17:49,
                                            Lydian Lee wrote:
                                            Hi,

                                            We are using Beam
                                            Python SDK with Flink
                                            Runner, the Beam
                                            version is 2.41.0 and
                                            the Flink version is
                                            1.15.4.

                                            We have a pipeline
                                            that has 2 stages:
                                            1. read from kafka
                                            and fixed window for
                                            every 1 minute
                                            2. aggregate the data
                                            for the past 1 minute
                                            and reshuffle so that
                                            we have less
                                            partition count and
                                            write them into s3.

                                            We disabled the
                                            enable.auto.commit
                                            and enabled
                                            commit_offset_in_finalize.
                                            also the
                                            auto.offset.reset is
                                            set to "latest"
                                            image.png

                                            According to the log,
                                            I can definitely find
                                            the data is consuming
                                            from Kafka Offset,
                                            Because there are many
                                            ```
                                            Resetting offset for
                                            topic
                                            XXXX-<PARTITION> to
                                            offset <OFFSET>
                                            ```
                                            and that
                                            partition/offset pair
                                            does match the
                                            missing records.
                                            However, it doesn't
                                            show up in the final S3.

                                            My current hypothesis
                                            is that the shuffling
                                            might be the reason
                                            for the issue, for
                                            example, originally
                                            in kafka for the past
                                            minute in partition
                                            1, I have offset 1,
                                            2, 3 records. After
                                            reshuffle, it now
                                            distribute, for example:
                                            - partition A: 1, 3
                                            - partition B: 2

                                            And if partition A is
                                            done successfully but
                                            partition B fails.
                                            Given that A is
                                            succeeded, it will
                                            commit its offset to
                                            Kafka, and thus kafka
                                            now has an offset to
                                            3.  And when kafka
                                            retries , it will
                                            skip the offset 2.
                                             However, I am not
                                            sure how exactly the
                                            offset commit works,
                                            wondering how it
                                            interacts with the
                                            checkpoints. But it
                                            does seem like if my
                                            hypothesis is
                                            correct, we should be
                                            seeing more missing
                                            records, however,
                                            this seems rare to
                                            happen. Wondering if
                                            anyone can help
                                            identify potential
                                            root causes? Thanks



Reply via email to