Hi,

We are using Beam (Python SDK + Flink Runner) to backup our streaming data
from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute fixed
window to group messages.  We've had similar pipeline in spark that we want
to replace it with this new pipeline.  However, the Beam pipeline seems
always having events missing, which we are thinking could be due to late
events (because the number of missing events get lower when having higher
allow_lateness)

We've tried the following approach to avoid late events, but none of them
are working:
1.  Use Processing timestamp instead of event time. Ideally if windowing is
using the processing timestamp, It shouldn't consider any event as late.
But this doesn't seem to work at all.
2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems not
working as expected, we've also configured the allow_lateness. But it still
have missing events compared to our old spark pipelines.

Here's the simplified code we have
```

def *add_timestamp*(event: Any) -> Any:

    import time

    from apache_beam import window

    return window.*TimestampedValue*(event, time.*time*())


(pipeline

    | "Kafka Read" >> *ReadFromKafka*(topic="test-topic",
consumer_config=consumer_config)

    | "Adding 'trigger_processing_time' timestamp" >> beam.*Map*
(add_timestamp)

    | "Window into Fixed Intervals"

    >> beam.*WindowInto*(

        beam.window.*FixedWindows*(fixed_window_size),

        allowed_lateness=beam.utils.timestamp.*Duration*(allowed_lateness)

    )

    |  "Write to s3" >> beam.*ParDo*(*WriteBatchesToS3*(s3_path))
```

I am wondering:
1. Is the add_timestamp approach correctly marked it to use processing time
for windowing?  If so, why there still late event consider we are using
processing time and not event time?
2.  Are there are any other approaches to avoid dropping any late event
besides ` allowed_lateness`?  In flink you can output those late events as
side output, wondering if we can do similar thing in Beam as well? Would
someone provide some code example?

Could someone help us debugging this?  Thanks!

---
* Flink's documentation about late event as side output:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output


Sincerely,
Lydian Lee

Reply via email to