Hello, So from what you shared, if we use default kafka reader (default is processing timestamp) we should not have late data isnt?
We had late data, thats why we started to assign timestamp in a python doFn. But indeed we could not see much improvement with it. We even tried to add a future timestamp (like one second in the future) but problem was still there. As it happened mainly in our integration test and really rarely in prd, it was not that high priority to investigate. Best, Marc On Wed, Sep 25, 2024, 18:33 Lydian Lee <tingyenlee...@gmail.com> wrote: > Thanks all for the suggestions, I’ve implemented the suggested changes, > but don’t have a good way to reproduce the error, so have to wait and see. > > On Wed, Sep 25, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Piotr, >> >> answer inline. >> On 9/24/24 09:53, Piotr Wiśniowski wrote: >> >> Subject: Input on Timestamps and Late Events in Beam Pipelines >> >> Hi team, >> >> I’d like to contribute to this discussion as I find it quite interesting. >> >> Regarding the element timestamps mentioned by Jan, I can confirm that >> it's accurate—users can reassign element timestamps in the same way >> described. This should be sufficient for the timestamps to be recognized by >> downstream aggregation. Additionally, clock synchronization issues could >> indeed be causing late events, as Jan suggested. >> >> It’s also worth noting that, by default, triggers output the aggregated >> result when they estimate that all data for a window has arrived, >> discarding any subsequent data for that window (as referenced in the same >> documentation Jan mentioned). I noticed that while your code defines >> allowed lateness, it doesn't specify a trigger to handle late events. As a >> result, these late events will likely be ignored. You might want to >> consider adding a trigger to the windowing function to re-output the >> results when late events arrive. This could help confirm the hypothesis, >> though in production, it's generally better to rely on the timestamps >> assigned by the source rather than reassigning them, as they should already >> be processing timestamps. >> >> I also have a question for the Beam developers, or anyone who might know: >> >> Assuming that Lydian does not reassign processing timestamps but instead >> reassigns data timestamps (which are not directly tied to processing time), >> what heuristics are used to determine when to close the downstream window >> in stream mode? Does Beam track the minimal timestamp seen and maintain >> state for this? What would the time window for such a heuristic be? Or, in >> this case, would the pipeline behave like it does in batch mode, halting >> while waiting for all data to arrive? I understand that the answer likely >> depends on the runner—I'm particularly interested in how this works in both >> Dataflow and Flink. >> >> Beam creates watermarks propagating from sources to sinks. PTransforms >> have two watermarks - input watermark and output watermark. Output >> watermark might be _hold back_ by some logic (typically buffers and/or >> timers). Ressigning timestamps is a stateless process, which means it does >> not interfere with watermark propagation and as such can without additional >> care cause late data. SDKs have access to "watermark hold state" by which a >> stateful transform can control how input watermark propagate to output >> watermark. But this is not (directly) exposed to users. Users can control >> watermark hold only through timers and their output timestamp, which seems >> to be sufficient under the Beam model. >> >> Best regards, >> Piotr Wiśniowski >> >> wt., 24 wrz 2024, 08:36 użytkownik Jan Lukavský <je...@seznam.cz> >> napisał: >> >>> Hi, >>> >>> I do not use Python SDK, but it seems, that - as opposed to Java SDK - >>> using simple lambda returning TimestampedValue, can really change the >>> timestamp of element [1]. Maybe some more experienced user of Python SDK >>> can confirm this? >>> >>> Assuming this is the case, then we have two factors at play: >>> >>> a) watermarks are computed at the source transform (ReadFromKafka) >>> using Java millisecond precision >>> >>> b) timestamps are later reassigned using Python's time.time() >>> >>> Both calls use system clock to compute the timestamp and thus can be >>> influenced by clock synchronization (e.g. NTP). This can (at least in >>> theory) cause the second call to time.time() return _smaller_ timestamp >>> than the one used to compute the watermark, which could cause the element >>> to become late event. If this is the issue, you can either increase allowed >>> lateness, or (maybe more conveniently) not reassign the timestamps, because >>> there already should be processing time assigned. >>> >>> Let us know if any of this works for you! >>> Best, >>> Jan >>> >>> [1] >>> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements >>> On 9/24/24 02:09, marc hurabielle wrote: >>> >>> Hello, >>> >>> I am jumping on this, because we are doing same things as Lydian. >>> In our case, we are using default timestamp strategy in kafka (so >>> processing timestamp). >>> We were also doing same as Lydian to add processing timestamp manually. >>> >>> >>> However we have late data. It mainly happen in our integration test with >>> flink. (parallelism 1), and happen really rarely in production. >>> >>> So it means we can't control the timestamp of an item even with >>> `window.TimestampedValue(event, time.time()))`? >>> >>> Best, >>> >>> Marc >>> >>> >>> On Tue, Sep 24, 2024, 04:23 Reuven Lax via user <user@beam.apache.org> >>> wrote: >>> >>>> Also as I said, the duplicate files might not appear like duplicates, >>>> which can be quite confusing. >>>> >>>> Out of curiosity, I would try - just for testing_ remove the line where >>>> you "add" processing time, and also set allowed_lateness to the largest >>>> allowed value. This will help determine whether late data is causing the >>>> dropped records. >>>> >>>> Reuven >>>> >>>> >>>> On Mon, Sep 23, 2024 at 2:10 PM Lydian Lee <tingyenlee...@gmail.com> >>>> wrote: >>>> >>>>> Hi Jan, >>>>> >>>>> Thanks for taking a quick look! Yes, the "with" statement would close >>>>> after the write. In our use cases, we actually don't mind if there are >>>>> duplicates of data written, but we are more concerned about the missing >>>>> data, which is the issue we are facing right now. >>>>> >>>>> On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user < >>>>> user@beam.apache.org> wrote: >>>>> >>>>>> Do you close the write afterwards? If not, I wonder if you could lose >>>>>> records due to unflushed data. >>>>>> >>>>>> Also - what you're doing here can definitely lead to duplicate data >>>>>> written, since this DoFn can be run multiple times. The duplicates might >>>>>> also appear different if the Iterables are slightly different on retries, >>>>>> especially in the case when Flink restarts a checkpoint. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee <tingyenlee...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Jan, >>>>>>> >>>>>>> Thanks so much for your help. Here's our write to s3: >>>>>>> >>>>>>> from pyarrow.parquet import ParquetWriter >>>>>>> class WriteBatchesToS3(beam.DoFn): >>>>>>> def __init__( >>>>>>> self, >>>>>>> output_path: str, >>>>>>> schema: pa.schema, >>>>>>> pipeline_options: PipelineOptions, >>>>>>> ) -> None: >>>>>>> self.output_path = output_path >>>>>>> self.schema = schema >>>>>>> self.pipeline_options = pipeline_options >>>>>>> def process(self, data: Iterable[List[Dict]]) -> None: >>>>>>> """Write one batch per file to S3.""" >>>>>>> client = beam.io.aws.clients.s3.boto3_client.Client(options=self. >>>>>>> pipeline_options) >>>>>>> fields_without_metadata = [pa.field(f.name, f.type) for f in self. >>>>>>> schema] >>>>>>> schema_without_field_metadata = pa.schema(fields_without_metadata) >>>>>>> filename = os.path.join( >>>>>>> self.output_path, >>>>>>> f"uuid_{str(uuid4())}.parquet", >>>>>>> ) >>>>>>> tables = [pa.Table.from_pylist(batch, schema= >>>>>>> schema_without_field_metadata) for batch in data] >>>>>>> if len(tables) == 0: >>>>>>> logging.info(f"No data to write for key: {partition_date}, the >>>>>>> grouped contents are: {data}") >>>>>>> return >>>>>>> with beam.io.aws.s3io.S3IO(client=client).open(filename=filename, >>>>>>> mode="w") as s3_writer: >>>>>>> with ParquetWriter( >>>>>>> s3_writer, schema_without_field_metadata, compression="SNAPPY", >>>>>>> use_deprecated_int96_timestamps=True >>>>>>> ) as parquet_writer: >>>>>>> merged_tables = pa.concat_tables(tables) >>>>>>> parquet_writer.write_table(merged_tables) >>>>>>> >>>>>>> On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <je...@seznam.cz> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Lydian, >>>>>>>> >>>>>>>> because you do not specify 'timestamp_policy' it should use the >>>>>>>> default, which should be processing time, so this should not be the >>>>>>>> issue. >>>>>>>> The one potentially left transform is the sink transform, as Reuven >>>>>>>> mentioned. Can you share details of the implementation? >>>>>>>> >>>>>>>> Jan >>>>>>>> On 9/19/24 23:10, Lydian Lee wrote: >>>>>>>> >>>>>>>> Hi, Jan >>>>>>>> >>>>>>>> Here's how we do ReadFromKafka, the expansion service is just to >>>>>>>> ensure we can work with xlang in k8s, so please ignore them. >>>>>>>> from apache_beam.io.kafka import default_io_expansion_service >>>>>>>> ReadFromKafka( >>>>>>>> consumer_config={ >>>>>>>> "group.id": "group-name", >>>>>>>> "auto.offset.reset": "latest", >>>>>>>> "enable.auto.commit": "false", >>>>>>>> }, >>>>>>>> topics="topic-name", >>>>>>>> with_metadata=False, >>>>>>>> expansion_service=default_io_expansion_service( >>>>>>>> append_args=[ >>>>>>>> f"--defaultEnvironmentType=PROCESS", >>>>>>>> f'--defaultEnvironmentConfig={"command": >>>>>>>> "/opt/apache/beam/java_boot"}', >>>>>>>> "--experiments=use_deprecated_read", >>>>>>>> ] >>>>>>>> ), >>>>>>>> commit_offset_in_finalize=True, >>>>>>>> ) >>>>>>>> >>>>>>>> Do you know what would be the right approach for using processing >>>>>>>> time instead? I thought the WindowInto supposed to use the timestamp we >>>>>>>> appened to the event? Do you think it is still using the original >>>>>>>> Kafka >>>>>>>> event timestamp? Thanks! >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <je...@seznam.cz> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Can you share the (relevant) parameters of the ReadFromKafka >>>>>>>>> transform? >>>>>>>>> >>>>>>>>> This feels strange, and it might not do what you'd expect: >>>>>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda >>>>>>>>> event: window.TimestampedValue(event, time.time())) >>>>>>>>> >>>>>>>>> This does not change the assigned timestamp of an element, but >>>>>>>>> creates a new element which contains processing time. It will not be >>>>>>>>> used >>>>>>>>> for windowing, though. >>>>>>>>> On 9/19/24 00:49, Lydian Lee wrote: >>>>>>>>> >>>>>>>>> Hi Reuven, >>>>>>>>> >>>>>>>>> Here's a quick look for our pipeline: >>>>>>>>> ( >>>>>>>>> pipeline >>>>>>>>> | "Reading message from Kafka" >> ReadFromKafka(...) >>>>>>>>> | "Deserializing events" >> Deserialize(**deserializer_args) >>>>>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda >>>>>>>>> event: window.TimestampedValue(event, time.time())) >>>>>>>>> | "Window into Fixed Intervals" >> beam.WindowInto( >>>>>>>>> beam.transforms.window.FixedWindows(fixed_window_size), # >>>>>>>>> fixed_window_size is 1 min. >>>>>>>>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), # >>>>>>>>> although we configured lateness, but because we are using processing >>>>>>>>> time, >>>>>>>>> i don't expect any late events >>>>>>>>> ) >>>>>>>>> | "Adding random integer partition key" >> beam.Map( >>>>>>>>> lambda event: (random.randint(1, 5), element) # add dummy key to >>>>>>>>> reshuffle to less partitions. Kafka have 16 partition, but we only >>>>>>>>> want to >>>>>>>>> generate 2 files every minute >>>>>>>>> ) >>>>>>>>> | "Group by randomly-assigned integer key" >> beam.GroupByKey() >>>>>>>>> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val) >>>>>>>>> | "Writing event data batches to parquet" >> WriteBatchesToS3(...) >>>>>>>>> # call boto3 to write the events into S3 with parquet format >>>>>>>>> ) >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user < >>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> How are you doing this aggregation? >>>>>>>>>> >>>>>>>>>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee < >>>>>>>>>> tingyenlee...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jan, >>>>>>>>>>> >>>>>>>>>>> Thanks for the recommendation. In our case, we are windowing >>>>>>>>>>> with the processing time, which means that there should be no late >>>>>>>>>>> event at >>>>>>>>>>> all. >>>>>>>>>>> >>>>>>>>>>> You’ve mentioned that GroupByKey is stateful and can potentially >>>>>>>>>>> drop the data. Given that after reshuffle (add random shuffle id to >>>>>>>>>>> the >>>>>>>>>>> key), we then do the aggregation (combine the data and write those >>>>>>>>>>> data to >>>>>>>>>>> S3.) Do you think the example I mentioned earlier could potentially >>>>>>>>>>> be the >>>>>>>>>>> reason for the dropping data? >>>>>>>>>>> >>>>>>>>>>> If so, in general how does Beam being able to prevent that ? Are >>>>>>>>>>> there any suggested approaches? Thanks >>>>>>>>>>> >>>>>>>>>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Lydian, >>>>>>>>>>>> >>>>>>>>>>>> in that case, there is only a generic advice you can look into. >>>>>>>>>>>> Reshuffle is a stateless operation that should not cause dropping >>>>>>>>>>>> data. A >>>>>>>>>>>> GroupByKey on the other hand is stateful and thus can - when >>>>>>>>>>>> dealing with >>>>>>>>>>>> late data - drop some of them. You should be able to confirm this >>>>>>>>>>>> looking >>>>>>>>>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This >>>>>>>>>>>> happens >>>>>>>>>>>> when elements arrive after watermark passes element's timestamp >>>>>>>>>>>> minus >>>>>>>>>>>> allowed lateness. If you see the log, you might need to either >>>>>>>>>>>> change how >>>>>>>>>>>> you assign timestamps to elements (e.g. use log append time) or >>>>>>>>>>>> increase >>>>>>>>>>>> allowed lateness of your windowfn. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> >>>>>>>>>>>> Jan >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132 >>>>>>>>>>>> On 9/18/24 08:53, Lydian Lee wrote: >>>>>>>>>>>> >>>>>>>>>>>> I would love to, but there are some limitations on our ends >>>>>>>>>>>> that the version bump won’t be happened soon. Thus I need to >>>>>>>>>>>> figure out >>>>>>>>>>>> what might be the root cause though. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Lydian, >>>>>>>>>>>>> >>>>>>>>>>>>> 2.41.0 is quite old, can you please try current version to see >>>>>>>>>>>>> if this issue is still present? There were lots of changes >>>>>>>>>>>>> between 2.41.0 >>>>>>>>>>>>> and 2.59.0. >>>>>>>>>>>>> >>>>>>>>>>>>> Jan >>>>>>>>>>>>> On 9/17/24 17:49, Lydian Lee wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> We are using Beam Python SDK with Flink Runner, the Beam >>>>>>>>>>>>> version is 2.41.0 and the Flink version is 1.15.4. >>>>>>>>>>>>> >>>>>>>>>>>>> We have a pipeline that has 2 stages: >>>>>>>>>>>>> 1. read from kafka and fixed window for every 1 minute >>>>>>>>>>>>> 2. aggregate the data for the past 1 minute and reshuffle so >>>>>>>>>>>>> that we have less partition count and write them into s3. >>>>>>>>>>>>> >>>>>>>>>>>>> We disabled the enable.auto.commit and enabled >>>>>>>>>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to >>>>>>>>>>>>> "latest" >>>>>>>>>>>>> [image: image.png] >>>>>>>>>>>>> >>>>>>>>>>>>> According to the log, I can definitely find the data is >>>>>>>>>>>>> consuming from Kafka Offset, Because there are many >>>>>>>>>>>>> ``` >>>>>>>>>>>>> Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET> >>>>>>>>>>>>> ``` >>>>>>>>>>>>> and that partition/offset pair does match the missing >>>>>>>>>>>>> records. However, it doesn't show up in the final S3. >>>>>>>>>>>>> >>>>>>>>>>>>> My current hypothesis is that the shuffling might be the >>>>>>>>>>>>> reason for the issue, for example, originally in kafka for the >>>>>>>>>>>>> past minute >>>>>>>>>>>>> in partition 1, I have offset 1, 2, 3 records. After reshuffle, >>>>>>>>>>>>> it now >>>>>>>>>>>>> distribute, for example: >>>>>>>>>>>>> - partition A: 1, 3 >>>>>>>>>>>>> - partition B: 2 >>>>>>>>>>>>> >>>>>>>>>>>>> And if partition A is done successfully but partition B fails. >>>>>>>>>>>>> Given that A is succeeded, it will commit its offset to Kafka, >>>>>>>>>>>>> and thus >>>>>>>>>>>>> kafka now has an offset to 3. And when kafka retries , it will >>>>>>>>>>>>> skip the >>>>>>>>>>>>> offset 2. However, I am not sure how exactly the offset commit >>>>>>>>>>>>> works, >>>>>>>>>>>>> wondering how it interacts with the checkpoints. But it does >>>>>>>>>>>>> seem like if >>>>>>>>>>>>> my hypothesis is correct, we should be seeing more missing >>>>>>>>>>>>> records, >>>>>>>>>>>>> however, this seems rare to happen. Wondering if anyone can help >>>>>>>>>>>>> identify >>>>>>>>>>>>> potential root causes? Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>