Hi, We are using Beam (Python SDK + Flink Runner) to backup our streaming data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute fixed window to group messages. We've had similar pipeline in spark that we want to replace it with this new pipeline. However, the Beam pipeline seems always having events missing, which we are thinking could be due to late events (because the number of missing events get lower when having higher allow_lateness)
We've tried the following approach to avoid late events, but none of them are working: 1. Use Processing timestamp instead of event time. Ideally if windowing is using the processing timestamp, It shouldn't consider any event as late. But this doesn't seem to work at all. 2. Configure allow_lateness to 12 hour. Given that approach 1 seems not working as expected, we've also configured the allow_lateness. But it still have missing events compared to our old spark pipelines. Here's the simplified code we have ``` def *add_timestamp*(event: Any) -> Any: import time from apache_beam import window return window.*TimestampedValue*(event, time.*time*()) (pipeline | "Kafka Read" >> *ReadFromKafka*(topic="test-topic", consumer_config=consumer_config) | "Adding 'trigger_processing_time' timestamp" >> beam.*Map* (add_timestamp) | "Window into Fixed Intervals" >> beam.*WindowInto*( beam.window.*FixedWindows*(fixed_window_size), allowed_lateness=beam.utils.timestamp.*Duration*(allowed_lateness) ) | "Write to s3" >> beam.*ParDo*(*WriteBatchesToS3*(s3_path)) ``` I am wondering: 1. Is the add_timestamp approach correctly marked it to use processing time for windowing? If so, why there still late event consider we are using processing time and not event time? 2. Are there are any other approaches to avoid dropping any late event besides ` allowed_lateness`? In flink you can output those late events as side output, wondering if we can do similar thing in Beam as well? Would someone provide some code example? Could someone help us debugging this? Thanks! --- * Flink's documentation about late event as side output: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output Sincerely, Lydian Lee