Thank you

Just to confirm: how did you configure Kafka offset commits? Did you have
this flag enabled?


https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--


On Thursday, 20 April 2023, Trevor Burke <trevor.bu...@affirm.com> wrote:
> Hi Pavel,
> Thanks for the reply.
> No, the event losses are not consistent. While we've been running our
pipelines in parallel (Beam vs Spark) we are seeing some days with no event
loss and some days with some, but it's always less than 0.05%
>
>
> On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p.o.solo...@gmail.com>
wrote:
>>
>> Hello Lydian,
>> Do you always observe data loss? Or - maybe, it happens only when you
restart your pipeline from a Flink savepoint? If you lose data only between
restarts - is you issue similar to
https://github.com/apache/beam/issues/26041 ?
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>
>>
>>
>>
>> On Tue, 18 Apr 2023 at 18:58, Lydian <lydia...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> We are using Beam (Python SDK + Flink Runner) to backup our streaming
data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute
fixed window to group messages.  We've had similar pipeline in spark that
we want to replace it with this new pipeline.  However, the Beam pipeline
seems always having events missing, which we are thinking could be due to
late events (because the number of missing events get lower when having
higher allow_lateness)
>>>
>>> We've tried the following approach to avoid late events, but none of
them are working:
>>> 1.  Use Processing timestamp instead of event time. Ideally if
windowing is using the processing timestamp, It shouldn't consider any
event as late. But this doesn't seem to work at all.
>>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems
not working as expected, we've also configured the allow_lateness. But it
still have missing events compared to our old spark pipelines.
>>>
>>> Here's the simplified code we have
>>> ```
>>>
>>> def add_timestamp(event: Any) -> Any:
>>>
>>>     import time
>>>
>>>     from apache_beam import window
>>>
>>>     return window.TimestampedValue(event, time.time())
>>>
>>> (pipeline
>>>
>>>     | "Kafka Read" >> ReadFromKafka(topic="test-topic",
consumer_config=consumer_config)
>>>
>>>     | "Adding 'trigger_processing_time' timestamp" >>
beam.Map(add_timestamp)
>>>
>>>     | "Window into Fixed Intervals"
>>>
>>>     >> beam.WindowInto(
>>>
>>>         beam.window.FixedWindows(fixed_window_size),
>>>
>>>         allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>>>
>>>     )
>>>
>>>     |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>>>
>>> ```
>>>
>>> I am wondering:
>>> 1. Is the add_timestamp approach correctly marked it to use processing
time for windowing?  If so, why there still late event consider we are
using processing time and not event time?
>>> 2.  Are there are any other approaches to avoid dropping any late event
besides ` allowed_lateness`?  In flink you can output those late events as
side output, wondering if we can do similar thing in Beam as well? Would
someone provide some code example?
>>>
>>> Could someone help us debugging this?  Thanks!
>>>
>>> ---
>>> * Flink's documentation about late event as side output:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>>>
>>>
>>> Sincerely,
>>> Lydian Lee
>
>
> --
> Trevor Burke (he/him)   |   Software Engineer, Data Platform   |
 415.794.4111
> <
https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0
>
>

-- 
Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>

Reply via email to