An option may be adding allowed_lateness? Check this guide - https://beam.apache.org/documentation/programming-guide/#managing-late-data
On Thu, 5 Dec 2024 at 09:08, Jessica Jenkins <jessica.jenk...@aclimalabs.com> wrote: > Hi! > > I’m writing a streaming Beam pipeline using the Python SDK and running on > Dataflow. We have mobile devices that send data to a Pub/Sub topic > approximately every 10 seconds. The message is a JSON object that contains > multiple sensor readings per second. The mobile devices may have poor > connectivity, in which case they buffer data and send it when online. > > I need to group the data for each device into 1-second windows and process > it. I’m struggling because the default watermark implementation seems to > use a global watermark estimation. However, different devices can have very > different connectivity and sometimes clock skews. Device A may report data > for times T1, T2, etc, and then Device B may report data for the same time > series T1, T2, etc, several hours later because of poor connectivity. I > would like the pipeline to not drop Device B’s data because Device A caused > the global watermark to advance. On the other hand, if Device A has already > reported some data for T100 and then some Device A data for T1 arrives > out-of-order several minutes later, it’s fine if that data is dropped. > > Is there a way to get per-key watermarks? I found this related StackOverflow > question > <https://stackoverflow.com/questions/62081656/using-groupbykey-with-having-each-key-in-its-own-window> > with a nice graphic. > > Naive implementation of pipeline that works OK but seems to have ~15% data > loss. > > with beam.Pipeline(options=pipeline_options) as pipeline: > > ( > > pipeline > > | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION, > with_attributes=True) > > | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten()) > > | "With timestamps and device_id keys" > > >> beam.Map(lambda elt: TimestampedValue((elt["device_id"], elt), elt[ > "timestamp"])) > > | "1 second windows" >> beam.WindowInto(FixedWindows(1)) > > | "GroupByKey" >> beam.GroupByKey() > > | "Process group" >> beam.ParDo(ProcessGroup()) > > ) > > > I also tried this implementation using GlobalWindows and > AfterProcessingTime with (device_id, timetamp) keys that’s <0.02% lossy. > Note that the timestamps have second granularity. Is something like this my > best bet? It doesn’t seem very intuitive, but it’s working pretty well. > > with beam.Pipeline(options=pipeline_options) as pipeline: > > ( > > pipeline > > | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION, > with_attributes=True) > > | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten()) > > | "With timestamps and (device_id, timestamp) keys" > > >> beam.Map(lambda elt: TimestampedValue(((elt["device_id"], elt[ > "timestamp"]), elt), elt["timestamp"])) > > | "10 second processing time windows" >> beam.WindowInto( > > GlobalWindows(), > > trigger=trigger.Repeatedly(trigger.AfterProcessingTime(10)), > > accumulation_mode=trigger.AccumulationMode.DISCARDING, > > ) > > | "GroupByKey" >> beam.GroupByKey() > > | "Process group" >> beam.ParDo(ProcessGroup()) > > ) > > Thanks in advance, > > Jess >