An option may be adding allowed_lateness? Check this guide -
https://beam.apache.org/documentation/programming-guide/#managing-late-data

On Thu, 5 Dec 2024 at 09:08, Jessica Jenkins <jessica.jenk...@aclimalabs.com>
wrote:

> Hi!
>
> I’m writing a streaming Beam pipeline using the Python SDK and running on
> Dataflow. We have mobile devices that send data to a Pub/Sub topic
> approximately every 10 seconds. The message is a JSON object that contains
> multiple sensor readings per second. The mobile devices may have poor
> connectivity, in which case they buffer data and send it when online.
>
> I need to group the data for each device into 1-second windows and process
> it. I’m struggling because the default watermark implementation seems to
> use a global watermark estimation. However, different devices can have very
> different connectivity and sometimes clock skews. Device A may report data
> for times T1, T2, etc, and then Device B may report data for the same time
> series T1, T2, etc, several hours later because of poor connectivity. I
> would like the pipeline to not drop Device B’s data because Device A caused
> the global watermark to advance. On the other hand, if Device A has already
> reported some data for T100 and then some Device A data for T1 arrives
> out-of-order several minutes later, it’s fine if that data is dropped.
>
> Is there a way to get per-key watermarks? I found this related StackOverflow
> question
> <https://stackoverflow.com/questions/62081656/using-groupbykey-with-having-each-key-in-its-own-window>
> with a nice graphic.
>
> Naive implementation of pipeline that works OK but seems to have ~15% data
> loss.
>
> with beam.Pipeline(options=pipeline_options) as pipeline:
>
> (
>
>   pipeline
>
>   | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION,
> with_attributes=True)
>
>   | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten())
>
>   | "With timestamps and device_id keys"
>
>   >> beam.Map(lambda elt: TimestampedValue((elt["device_id"], elt), elt[
> "timestamp"]))
>
>   | "1 second windows" >> beam.WindowInto(FixedWindows(1))
>
>   | "GroupByKey" >> beam.GroupByKey()
>
>   | "Process group" >> beam.ParDo(ProcessGroup())
>
> )
>
>
> I also tried this implementation using GlobalWindows and
> AfterProcessingTime with (device_id, timetamp) keys that’s <0.02% lossy.
> Note that the timestamps have second granularity. Is something like this my
> best bet? It doesn’t seem very intuitive, but it’s working pretty well.
>
> with beam.Pipeline(options=pipeline_options) as pipeline:
>
> (
>
>   pipeline
>
>   | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION,
> with_attributes=True)
>
>   | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten())
>
>   | "With timestamps and (device_id, timestamp) keys"
>
>   >> beam.Map(lambda elt: TimestampedValue(((elt["device_id"], elt[
> "timestamp"]), elt), elt["timestamp"]))
>
>   | "10 second processing time windows" >> beam.WindowInto(
>
>     GlobalWindows(),
>
>     trigger=trigger.Repeatedly(trigger.AfterProcessingTime(10)),
>
>     accumulation_mode=trigger.AccumulationMode.DISCARDING,
>
>   )
>
>   | "GroupByKey" >> beam.GroupByKey()
>
>   | "Process group" >> beam.ParDo(ProcessGroup())
>
> )
>
> Thanks in advance,
>
> Jess
>

Reply via email to