Hi! I’m writing a streaming Beam pipeline using the Python SDK and running on Dataflow. We have mobile devices that send data to a Pub/Sub topic approximately every 10 seconds. The message is a JSON object that contains multiple sensor readings per second. The mobile devices may have poor connectivity, in which case they buffer data and send it when online.
I need to group the data for each device into 1-second windows and process it. I’m struggling because the default watermark implementation seems to use a global watermark estimation. However, different devices can have very different connectivity and sometimes clock skews. Device A may report data for times T1, T2, etc, and then Device B may report data for the same time series T1, T2, etc, several hours later because of poor connectivity. I would like the pipeline to not drop Device B’s data because Device A caused the global watermark to advance. On the other hand, if Device A has already reported some data for T100 and then some Device A data for T1 arrives out-of-order several minutes later, it’s fine if that data is dropped. Is there a way to get per-key watermarks? I found this related StackOverflow question <https://stackoverflow.com/questions/62081656/using-groupbykey-with-having-each-key-in-its-own-window> with a nice graphic. Naive implementation of pipeline that works OK but seems to have ~15% data loss. with beam.Pipeline(options=pipeline_options) as pipeline: ( pipeline | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION, with_attributes=True) | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten()) | "With timestamps and device_id keys" >> beam.Map(lambda elt: TimestampedValue((elt["device_id"], elt), elt[ "timestamp"])) | "1 second windows" >> beam.WindowInto(FixedWindows(1)) | "GroupByKey" >> beam.GroupByKey() | "Process group" >> beam.ParDo(ProcessGroup()) ) I also tried this implementation using GlobalWindows and AfterProcessingTime with (device_id, timetamp) keys that’s <0.02% lossy. Note that the timestamps have second granularity. Is something like this my best bet? It doesn’t seem very intuitive, but it’s working pretty well. with beam.Pipeline(options=pipeline_options) as pipeline: ( pipeline | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION, with_attributes=True) | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten()) | "With timestamps and (device_id, timestamp) keys" >> beam.Map(lambda elt: TimestampedValue(((elt["device_id"], elt[ "timestamp"]), elt), elt["timestamp"])) | "10 second processing time windows" >> beam.WindowInto( GlobalWindows(), trigger=trigger.Repeatedly(trigger.AfterProcessingTime(10)), accumulation_mode=trigger.AccumulationMode.DISCARDING, ) | "GroupByKey" >> beam.GroupByKey() | "Process group" >> beam.ParDo(ProcessGroup()) ) Thanks in advance, Jess