On Fri, Apr 21, 2023 at 3:37 AM Pavel Solomin <p.o.solo...@gmail.com> wrote: > > Thank you for the information. > > I'm assuming you had a unique ID in records, and you observed some IDs > missing in Beam output comparing with Spark, and not just some duplicates > produced by Spark. > > If so, I would suggest to create a P1 issue at > https://github.com/apache/beam/issues
+1, ideally with enough information to reproduce. As far as I understand, what you have should just work (but I'm not a flink expert). > Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ? > > Unfortunately, I can't be more helpful here, but let me share some of the > gotchas I had from my previous experience of running Beam on top of Flink for > similar use-case (landing of data from messaging system into files): > > (1) https://github.com/apache/beam/issues/26041 - I've solved that by adding > a runId into file names which is re-generated between app (re) starts > > (2) I used processing time watermarks and simple window without lateness set > up - combining it with (1) achieved no data loss > > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > > > > > > > On Thu, 20 Apr 2023 at 02:18, Lydian <lydia...@gmail.com> wrote: >> >> Yes, we did enabled this in our pipeline. >> >> On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin <p.o.solo...@gmail.com> wrote: >>> >>> Thank you >>> >>> Just to confirm: how did you configure Kafka offset commits? Did you have >>> this flag enabled? >>> >>> >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled-- >>> >>> >>> On Thursday, 20 April 2023, Trevor Burke <trevor.bu...@affirm.com> wrote: >>> > Hi Pavel, >>> > Thanks for the reply. >>> > No, the event losses are not consistent. While we've been running our >>> > pipelines in parallel (Beam vs Spark) we are seeing some days with no >>> > event loss and some days with some, but it's always less than 0.05% >>> > >>> > >>> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p.o.solo...@gmail.com> >>> > wrote: >>> >> >>> >> Hello Lydian, >>> >> Do you always observe data loss? Or - maybe, it happens only when you >>> >> restart your pipeline from a Flink savepoint? If you lose data only >>> >> between restarts - is you issue similar to >>> >> https://github.com/apache/beam/issues/26041 ? >>> >> >>> >> Best Regards, >>> >> Pavel Solomin >>> >> >>> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >>> >> >>> >> >>> >> >>> >> >>> >> On Tue, 18 Apr 2023 at 18:58, Lydian <lydia...@gmail.com> wrote: >>> >>> >>> >>> Hi, >>> >>> >>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming >>> >>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 >>> >>> minute fixed window to group messages. We've had similar pipeline in >>> >>> spark that we want to replace it with this new pipeline. However, the >>> >>> Beam pipeline seems always having events missing, which we are thinking >>> >>> could be due to late events (because the number of missing events get >>> >>> lower when having higher allow_lateness) >>> >>> >>> >>> We've tried the following approach to avoid late events, but none of >>> >>> them are working: >>> >>> 1. Use Processing timestamp instead of event time. Ideally if >>> >>> windowing is using the processing timestamp, It shouldn't consider any >>> >>> event as late. But this doesn't seem to work at all. >>> >>> 2. Configure allow_lateness to 12 hour. Given that approach 1 seems >>> >>> not working as expected, we've also configured the allow_lateness. But >>> >>> it still have missing events compared to our old spark pipelines. >>> >>> >>> >>> Here's the simplified code we have >>> >>> ``` >>> >>> >>> >>> def add_timestamp(event: Any) -> Any: >>> >>> >>> >>> import time >>> >>> >>> >>> from apache_beam import window >>> >>> >>> >>> return window.TimestampedValue(event, time.time()) >>> >>> >>> >>> (pipeline >>> >>> >>> >>> | "Kafka Read" >> ReadFromKafka(topic="test-topic", >>> >>> consumer_config=consumer_config) >>> >>> >>> >>> | "Adding 'trigger_processing_time' timestamp" >> >>> >>> beam.Map(add_timestamp) >>> >>> >>> >>> | "Window into Fixed Intervals" >>> >>> >>> >>> >> beam.WindowInto( >>> >>> >>> >>> beam.window.FixedWindows(fixed_window_size), >>> >>> >>> >>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness) >>> >>> >>> >>> ) >>> >>> >>> >>> | "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path)) >>> >>> >>> >>> ``` >>> >>> >>> >>> I am wondering: >>> >>> 1. Is the add_timestamp approach correctly marked it to use processing >>> >>> time for windowing? If so, why there still late event consider we are >>> >>> using processing time and not event time? >>> >>> 2. Are there are any other approaches to avoid dropping any late event >>> >>> besides ` allowed_lateness`? In flink you can output those late events >>> >>> as side output, wondering if we can do similar thing in Beam as well? >>> >>> Would someone provide some code example? >>> >>> >>> >>> Could someone help us debugging this? Thanks! >>> >>> >>> >>> --- >>> >>> * Flink's documentation about late event as side output: >>> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output >>> >>> >>> >>> >>> >>> Sincerely, >>> >>> Lydian Lee >>> > >>> > >>> > -- >>> > Trevor Burke (he/him) | Software Engineer, Data Platform | >>> > 415.794.4111 >>> > <https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0> >>> > >>> >>> -- >>> Best Regards, >>> Pavel Solomin >>> >>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >>> >>> >>> >>> >>> >> -- >> Sincerely, >> Lydian Lee >>