Thank you for the information.

I'm assuming you had a unique ID in records, and you observed some IDs
missing in Beam output comparing with Spark, and not just some duplicates
produced by Spark.

If so, I would suggest to create a P1 issue at
https://github.com/apache/beam/issues

Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ?

Unfortunately, I can't be more helpful here, but let me share some of the
gotchas I had from my previous experience of running Beam on top of Flink
for similar use-case (landing of data from messaging system into files):

(1) https://github.com/apache/beam/issues/26041 - I've solved that by
adding a runId into file names which is re-generated between app (re) starts

(2) I used processing time watermarks and simple window without lateness
set up - combining it with (1) achieved no data loss

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Thu, 20 Apr 2023 at 02:18, Lydian <lydia...@gmail.com> wrote:

> Yes, we did enabled this in our pipeline.
>
> On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin <p.o.solo...@gmail.com>
> wrote:
>
>> Thank you
>>
>> Just to confirm: how did you configure Kafka offset commits? Did you have
>> this flag enabled?
>>
>>
>>
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--
>>
>>
>> On Thursday, 20 April 2023, Trevor Burke <trevor.bu...@affirm.com> wrote:
>> > Hi Pavel,
>> > Thanks for the reply.
>> > No, the event losses are not consistent. While we've been running our
>> pipelines in parallel (Beam vs Spark) we are seeing some days with no event
>> loss and some days with some, but it's always less than 0.05%
>> >
>> >
>> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p.o.solo...@gmail.com>
>> wrote:
>> >>
>> >> Hello Lydian,
>> >> Do you always observe data loss? Or - maybe, it happens only when you
>> restart your pipeline from a Flink savepoint? If you lose data only between
>> restarts - is you issue similar to
>> https://github.com/apache/beam/issues/26041 ?
>> >>
>> >> Best Regards,
>> >> Pavel Solomin
>> >>
>> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> >>
>> >>
>> >>
>> >>
>> >> On Tue, 18 Apr 2023 at 18:58, Lydian <lydia...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming
>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute
>> fixed window to group messages.  We've had similar pipeline in spark that
>> we want to replace it with this new pipeline.  However, the Beam pipeline
>> seems always having events missing, which we are thinking could be due to
>> late events (because the number of missing events get lower when having
>> higher allow_lateness)
>> >>>
>> >>> We've tried the following approach to avoid late events, but none of
>> them are working:
>> >>> 1.  Use Processing timestamp instead of event time. Ideally if
>> windowing is using the processing timestamp, It shouldn't consider any
>> event as late. But this doesn't seem to work at all.
>> >>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems
>> not working as expected, we've also configured the allow_lateness. But it
>> still have missing events compared to our old spark pipelines.
>> >>>
>> >>> Here's the simplified code we have
>> >>> ```
>> >>>
>> >>> def add_timestamp(event: Any) -> Any:
>> >>>
>> >>>     import time
>> >>>
>> >>>     from apache_beam import window
>> >>>
>> >>>     return window.TimestampedValue(event, time.time())
>> >>>
>> >>> (pipeline
>> >>>
>> >>>     | "Kafka Read" >> ReadFromKafka(topic="test-topic",
>> consumer_config=consumer_config)
>> >>>
>> >>>     | "Adding 'trigger_processing_time' timestamp" >>
>> beam.Map(add_timestamp)
>> >>>
>> >>>     | "Window into Fixed Intervals"
>> >>>
>> >>>     >> beam.WindowInto(
>> >>>
>> >>>         beam.window.FixedWindows(fixed_window_size),
>> >>>
>> >>>
>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>> >>>
>> >>>     )
>> >>>
>> >>>     |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>> >>>
>> >>> ```
>> >>>
>> >>> I am wondering:
>> >>> 1. Is the add_timestamp approach correctly marked it to use
>> processing time for windowing?  If so, why there still late event consider
>> we are using processing time and not event time?
>> >>> 2.  Are there are any other approaches to avoid dropping any late
>> event besides ` allowed_lateness`?  In flink you can output those late
>> events as side output, wondering if we can do similar thing in Beam as
>> well? Would someone provide some code example?
>> >>>
>> >>> Could someone help us debugging this?  Thanks!
>> >>>
>> >>> ---
>> >>> * Flink's documentation about late event as side output:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>> >>>
>> >>>
>> >>> Sincerely,
>> >>> Lydian Lee
>> >
>> >
>> > --
>> > Trevor Burke (he/him)   |   Software Engineer, Data Platform   |
>>  415.794.4111
>> > <
>> https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0
>> >
>> >
>>
>> --
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>> --
> Sincerely,
> Lydian Lee
>
>

Reply via email to