Thank you for the information. I'm assuming you had a unique ID in records, and you observed some IDs missing in Beam output comparing with Spark, and not just some duplicates produced by Spark.
If so, I would suggest to create a P1 issue at https://github.com/apache/beam/issues Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ? Unfortunately, I can't be more helpful here, but let me share some of the gotchas I had from my previous experience of running Beam on top of Flink for similar use-case (landing of data from messaging system into files): (1) https://github.com/apache/beam/issues/26041 - I've solved that by adding a runId into file names which is re-generated between app (re) starts (2) I used processing time watermarks and simple window without lateness set up - combining it with (1) achieved no data loss Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Thu, 20 Apr 2023 at 02:18, Lydian <lydia...@gmail.com> wrote: > Yes, we did enabled this in our pipeline. > > On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin <p.o.solo...@gmail.com> > wrote: > >> Thank you >> >> Just to confirm: how did you configure Kafka offset commits? Did you have >> this flag enabled? >> >> >> >> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled-- >> >> >> On Thursday, 20 April 2023, Trevor Burke <trevor.bu...@affirm.com> wrote: >> > Hi Pavel, >> > Thanks for the reply. >> > No, the event losses are not consistent. While we've been running our >> pipelines in parallel (Beam vs Spark) we are seeing some days with no event >> loss and some days with some, but it's always less than 0.05% >> > >> > >> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p.o.solo...@gmail.com> >> wrote: >> >> >> >> Hello Lydian, >> >> Do you always observe data loss? Or - maybe, it happens only when you >> restart your pipeline from a Flink savepoint? If you lose data only between >> restarts - is you issue similar to >> https://github.com/apache/beam/issues/26041 ? >> >> >> >> Best Regards, >> >> Pavel Solomin >> >> >> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >> >> >> >> >> >> >> >> >> >> On Tue, 18 Apr 2023 at 18:58, Lydian <lydia...@gmail.com> wrote: >> >>> >> >>> Hi, >> >>> >> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming >> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute >> fixed window to group messages. We've had similar pipeline in spark that >> we want to replace it with this new pipeline. However, the Beam pipeline >> seems always having events missing, which we are thinking could be due to >> late events (because the number of missing events get lower when having >> higher allow_lateness) >> >>> >> >>> We've tried the following approach to avoid late events, but none of >> them are working: >> >>> 1. Use Processing timestamp instead of event time. Ideally if >> windowing is using the processing timestamp, It shouldn't consider any >> event as late. But this doesn't seem to work at all. >> >>> 2. Configure allow_lateness to 12 hour. Given that approach 1 seems >> not working as expected, we've also configured the allow_lateness. But it >> still have missing events compared to our old spark pipelines. >> >>> >> >>> Here's the simplified code we have >> >>> ``` >> >>> >> >>> def add_timestamp(event: Any) -> Any: >> >>> >> >>> import time >> >>> >> >>> from apache_beam import window >> >>> >> >>> return window.TimestampedValue(event, time.time()) >> >>> >> >>> (pipeline >> >>> >> >>> | "Kafka Read" >> ReadFromKafka(topic="test-topic", >> consumer_config=consumer_config) >> >>> >> >>> | "Adding 'trigger_processing_time' timestamp" >> >> beam.Map(add_timestamp) >> >>> >> >>> | "Window into Fixed Intervals" >> >>> >> >>> >> beam.WindowInto( >> >>> >> >>> beam.window.FixedWindows(fixed_window_size), >> >>> >> >>> >> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness) >> >>> >> >>> ) >> >>> >> >>> | "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path)) >> >>> >> >>> ``` >> >>> >> >>> I am wondering: >> >>> 1. Is the add_timestamp approach correctly marked it to use >> processing time for windowing? If so, why there still late event consider >> we are using processing time and not event time? >> >>> 2. Are there are any other approaches to avoid dropping any late >> event besides ` allowed_lateness`? In flink you can output those late >> events as side output, wondering if we can do similar thing in Beam as >> well? Would someone provide some code example? >> >>> >> >>> Could someone help us debugging this? Thanks! >> >>> >> >>> --- >> >>> * Flink's documentation about late event as side output: >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output >> >>> >> >>> >> >>> Sincerely, >> >>> Lydian Lee >> > >> > >> > -- >> > Trevor Burke (he/him) | Software Engineer, Data Platform | >> 415.794.4111 >> > < >> https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0 >> > >> > >> >> -- >> Best Regards, >> Pavel Solomin >> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >> <https://www.linkedin.com/in/pavelsolomin> >> >> >> >> >> -- > Sincerely, > Lydian Lee > >