Thank you Just to confirm: how did you configure Kafka offset commits? Did you have this flag enabled?
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled-- On Thursday, 20 April 2023, Trevor Burke <trevor.bu...@affirm.com> wrote: > Hi Pavel, > Thanks for the reply. > No, the event losses are not consistent. While we've been running our pipelines in parallel (Beam vs Spark) we are seeing some days with no event loss and some days with some, but it's always less than 0.05% > > > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p.o.solo...@gmail.com> wrote: >> >> Hello Lydian, >> Do you always observe data loss? Or - maybe, it happens only when you restart your pipeline from a Flink savepoint? If you lose data only between restarts - is you issue similar to https://github.com/apache/beam/issues/26041 ? >> >> Best Regards, >> Pavel Solomin >> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >> >> >> >> >> On Tue, 18 Apr 2023 at 18:58, Lydian <lydia...@gmail.com> wrote: >>> >>> Hi, >>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute fixed window to group messages. We've had similar pipeline in spark that we want to replace it with this new pipeline. However, the Beam pipeline seems always having events missing, which we are thinking could be due to late events (because the number of missing events get lower when having higher allow_lateness) >>> >>> We've tried the following approach to avoid late events, but none of them are working: >>> 1. Use Processing timestamp instead of event time. Ideally if windowing is using the processing timestamp, It shouldn't consider any event as late. But this doesn't seem to work at all. >>> 2. Configure allow_lateness to 12 hour. Given that approach 1 seems not working as expected, we've also configured the allow_lateness. But it still have missing events compared to our old spark pipelines. >>> >>> Here's the simplified code we have >>> ``` >>> >>> def add_timestamp(event: Any) -> Any: >>> >>> import time >>> >>> from apache_beam import window >>> >>> return window.TimestampedValue(event, time.time()) >>> >>> (pipeline >>> >>> | "Kafka Read" >> ReadFromKafka(topic="test-topic", consumer_config=consumer_config) >>> >>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(add_timestamp) >>> >>> | "Window into Fixed Intervals" >>> >>> >> beam.WindowInto( >>> >>> beam.window.FixedWindows(fixed_window_size), >>> >>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness) >>> >>> ) >>> >>> | "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path)) >>> >>> ``` >>> >>> I am wondering: >>> 1. Is the add_timestamp approach correctly marked it to use processing time for windowing? If so, why there still late event consider we are using processing time and not event time? >>> 2. Are there are any other approaches to avoid dropping any late event besides ` allowed_lateness`? In flink you can output those late events as side output, wondering if we can do similar thing in Beam as well? Would someone provide some code example? >>> >>> Could someone help us debugging this? Thanks! >>> >>> --- >>> * Flink's documentation about late event as side output: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output >>> >>> >>> Sincerely, >>> Lydian Lee > > > -- > Trevor Burke (he/him) | Software Engineer, Data Platform | 415.794.4111 > < https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0 > > -- Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin>