On Fri, Apr 21, 2023 at 3:37 AM Pavel Solomin <p.o.solo...@gmail.com> wrote:
>
> Thank you for the information.
>
> I'm assuming you had a unique ID in records, and you observed some IDs 
> missing in Beam output comparing with Spark, and not just some duplicates 
> produced by Spark.
>
> If so, I would suggest to create a P1 issue at 
> https://github.com/apache/beam/issues

+1, ideally with enough information to reproduce. As far as I
understand, what you have should just work (but I'm not a flink
expert).

> Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ?
>
> Unfortunately, I can't be more helpful here, but let me share some of the 
> gotchas I had from my previous experience of running Beam on top of Flink for 
> similar use-case (landing of data from messaging system into files):
>
> (1) https://github.com/apache/beam/issues/26041 - I've solved that by adding 
> a runId into file names which is re-generated between app (re) starts
>
> (2) I used processing time watermarks and simple window without lateness set 
> up - combining it with (1) achieved no data loss
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>
>
>
>
>
>
> On Thu, 20 Apr 2023 at 02:18, Lydian <lydia...@gmail.com> wrote:
>>
>> Yes, we did enabled this in our pipeline.
>>
>> On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin <p.o.solo...@gmail.com> wrote:
>>>
>>> Thank you
>>>
>>> Just to confirm: how did you configure Kafka offset commits? Did you have 
>>> this flag enabled?
>>>
>>>
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--
>>>
>>>
>>> On Thursday, 20 April 2023, Trevor Burke <trevor.bu...@affirm.com> wrote:
>>> > Hi Pavel,
>>> > Thanks for the reply.
>>> > No, the event losses are not consistent. While we've been running our 
>>> > pipelines in parallel (Beam vs Spark) we are seeing some days with no 
>>> > event loss and some days with some, but it's always less than 0.05%
>>> >
>>> >
>>> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p.o.solo...@gmail.com> 
>>> > wrote:
>>> >>
>>> >> Hello Lydian,
>>> >> Do you always observe data loss? Or - maybe, it happens only when you 
>>> >> restart your pipeline from a Flink savepoint? If you lose data only 
>>> >> between restarts - is you issue similar to 
>>> >> https://github.com/apache/beam/issues/26041 ?
>>> >>
>>> >> Best Regards,
>>> >> Pavel Solomin
>>> >>
>>> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Tue, 18 Apr 2023 at 18:58, Lydian <lydia...@gmail.com> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming 
>>> >>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 
>>> >>> minute fixed window to group messages.  We've had similar pipeline in 
>>> >>> spark that we want to replace it with this new pipeline.  However, the 
>>> >>> Beam pipeline seems always having events missing, which we are thinking 
>>> >>> could be due to late events (because the number of missing events get 
>>> >>> lower when having higher allow_lateness)
>>> >>>
>>> >>> We've tried the following approach to avoid late events, but none of 
>>> >>> them are working:
>>> >>> 1.  Use Processing timestamp instead of event time. Ideally if 
>>> >>> windowing is using the processing timestamp, It shouldn't consider any 
>>> >>> event as late. But this doesn't seem to work at all.
>>> >>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems 
>>> >>> not working as expected, we've also configured the allow_lateness. But 
>>> >>> it still have missing events compared to our old spark pipelines.
>>> >>>
>>> >>> Here's the simplified code we have
>>> >>> ```
>>> >>>
>>> >>> def add_timestamp(event: Any) -> Any:
>>> >>>
>>> >>>     import time
>>> >>>
>>> >>>     from apache_beam import window
>>> >>>
>>> >>>     return window.TimestampedValue(event, time.time())
>>> >>>
>>> >>> (pipeline
>>> >>>
>>> >>>     | "Kafka Read" >> ReadFromKafka(topic="test-topic", 
>>> >>> consumer_config=consumer_config)
>>> >>>
>>> >>>     | "Adding 'trigger_processing_time' timestamp" >> 
>>> >>> beam.Map(add_timestamp)
>>> >>>
>>> >>>     | "Window into Fixed Intervals"
>>> >>>
>>> >>>     >> beam.WindowInto(
>>> >>>
>>> >>>         beam.window.FixedWindows(fixed_window_size),
>>> >>>
>>> >>>         allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>>> >>>
>>> >>>     )
>>> >>>
>>> >>>     |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>>> >>>
>>> >>> ```
>>> >>>
>>> >>> I am wondering:
>>> >>> 1. Is the add_timestamp approach correctly marked it to use processing 
>>> >>> time for windowing?  If so, why there still late event consider we are 
>>> >>> using processing time and not event time?
>>> >>> 2.  Are there are any other approaches to avoid dropping any late event 
>>> >>> besides ` allowed_lateness`?  In flink you can output those late events 
>>> >>> as side output, wondering if we can do similar thing in Beam as well? 
>>> >>> Would someone provide some code example?
>>> >>>
>>> >>> Could someone help us debugging this?  Thanks!
>>> >>>
>>> >>> ---
>>> >>> * Flink's documentation about late event as side output:  
>>> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>>> >>>
>>> >>>
>>> >>> Sincerely,
>>> >>> Lydian Lee
>>> >
>>> >
>>> > --
>>> > Trevor Burke (he/him)   |   Software Engineer, Data Platform   |   
>>> > 415.794.4111
>>> > <https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0>
>>> >
>>>
>>> --
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>
>>>
>>>
>>>
>>>
>> --
>> Sincerely,
>> Lydian Lee
>>

Reply via email to