Hi, Also in your case I would suggest considering not using event time from payload, but just msg pubsub publish time. This would make sure that no msgs are dropped, but it could make downstream logic harder to implement (probably instead of using fixed window you would need to use state and timers as it would need to account for unordered input - depending on what you do in 'ProcessGroup').
Alternative is as Kim suggested - adding allowed lateness to fixed windows and late trigger to re-emit output in case of late data. Best Wiśniowski Piotr czw., 5 gru 2024, 03:57 użytkownik Jaehyeon Kim <dott...@gmail.com> napisał: > An option may be adding allowed_lateness? Check this guide - > https://beam.apache.org/documentation/programming-guide/#managing-late-data > > On Thu, 5 Dec 2024 at 09:08, Jessica Jenkins < > jessica.jenk...@aclimalabs.com> wrote: > >> Hi! >> >> I’m writing a streaming Beam pipeline using the Python SDK and running on >> Dataflow. We have mobile devices that send data to a Pub/Sub topic >> approximately every 10 seconds. The message is a JSON object that contains >> multiple sensor readings per second. The mobile devices may have poor >> connectivity, in which case they buffer data and send it when online. >> >> I need to group the data for each device into 1-second windows and >> process it. I’m struggling because the default watermark implementation >> seems to use a global watermark estimation. However, different devices can >> have very different connectivity and sometimes clock skews. Device A may >> report data for times T1, T2, etc, and then Device B may report data for >> the same time series T1, T2, etc, several hours later because of poor >> connectivity. I would like the pipeline to not drop Device B’s data because >> Device A caused the global watermark to advance. On the other hand, if >> Device A has already reported some data for T100 and then some Device A >> data for T1 arrives out-of-order several minutes later, it’s fine if that >> data is dropped. >> >> Is there a way to get per-key watermarks? I found this related StackOverflow >> question >> <https://stackoverflow.com/questions/62081656/using-groupbykey-with-having-each-key-in-its-own-window> >> with a nice graphic. >> >> Naive implementation of pipeline that works OK but seems to have ~15% >> data loss. >> >> with beam.Pipeline(options=pipeline_options) as pipeline: >> >> ( >> >> pipeline >> >> | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION, >> with_attributes=True) >> >> | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten()) >> >> | "With timestamps and device_id keys" >> >> >> beam.Map(lambda elt: TimestampedValue((elt["device_id"], elt), elt[ >> "timestamp"])) >> >> | "1 second windows" >> beam.WindowInto(FixedWindows(1)) >> >> | "GroupByKey" >> beam.GroupByKey() >> >> | "Process group" >> beam.ParDo(ProcessGroup()) >> >> ) >> >> >> I also tried this implementation using GlobalWindows and >> AfterProcessingTime with (device_id, timetamp) keys that’s <0.02% lossy. >> Note that the timestamps have second granularity. Is something like this my >> best bet? It doesn’t seem very intuitive, but it’s working pretty well. >> >> with beam.Pipeline(options=pipeline_options) as pipeline: >> >> ( >> >> pipeline >> >> | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION, >> with_attributes=True) >> >> | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten()) >> >> | "With timestamps and (device_id, timestamp) keys" >> >> >> beam.Map(lambda elt: TimestampedValue(((elt["device_id"], elt[ >> "timestamp"]), elt), elt["timestamp"])) >> >> | "10 second processing time windows" >> beam.WindowInto( >> >> GlobalWindows(), >> >> trigger=trigger.Repeatedly(trigger.AfterProcessingTime(10)), >> >> accumulation_mode=trigger.AccumulationMode.DISCARDING, >> >> ) >> >> | "GroupByKey" >> beam.GroupByKey() >> >> | "Process group" >> beam.ParDo(ProcessGroup()) >> >> ) >> >> Thanks in advance, >> >> Jess >> >