Hi!

I’m writing a streaming Beam pipeline using the Python SDK and running on
Dataflow. We have mobile devices that send data to a Pub/Sub topic
approximately every 10 seconds. The message is a JSON object that contains
multiple sensor readings per second. The mobile devices may have poor
connectivity, in which case they buffer data and send it when online.

I need to group the data for each device into 1-second windows and process
it. I’m struggling because the default watermark implementation seems to
use a global watermark estimation. However, different devices can have very
different connectivity and sometimes clock skews. Device A may report data
for times T1, T2, etc, and then Device B may report data for the same time
series T1, T2, etc, several hours later because of poor connectivity. I
would like the pipeline to not drop Device B’s data because Device A caused
the global watermark to advance. On the other hand, if Device A has already
reported some data for T100 and then some Device A data for T1 arrives
out-of-order several minutes later, it’s fine if that data is dropped.

Is there a way to get per-key watermarks? I found this related StackOverflow
question
<https://stackoverflow.com/questions/62081656/using-groupbykey-with-having-each-key-in-its-own-window>
with a nice graphic.

Naive implementation of pipeline that works OK but seems to have ~15% data
loss.

with beam.Pipeline(options=pipeline_options) as pipeline:

(

  pipeline

  | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION,
with_attributes=True)

  | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten())

  | "With timestamps and device_id keys"

  >> beam.Map(lambda elt: TimestampedValue((elt["device_id"], elt), elt[
"timestamp"]))

  | "1 second windows" >> beam.WindowInto(FixedWindows(1))

  | "GroupByKey" >> beam.GroupByKey()

  | "Process group" >> beam.ParDo(ProcessGroup())

)


I also tried this implementation using GlobalWindows and
AfterProcessingTime with (device_id, timetamp) keys that’s <0.02% lossy.
Note that the timestamps have second granularity. Is something like this my
best bet? It doesn’t seem very intuitive, but it’s working pretty well.

with beam.Pipeline(options=pipeline_options) as pipeline:

(

  pipeline

  | "Read PubSub" >> ReadFromPubSub(subscription=SUBSCRIPTION,
with_attributes=True)

  | "Parse message and flatten" >> beam.ParDo(ParseAndFlatten())

  | "With timestamps and (device_id, timestamp) keys"

  >> beam.Map(lambda elt: TimestampedValue(((elt["device_id"], elt[
"timestamp"]), elt), elt["timestamp"]))

  | "10 second processing time windows" >> beam.WindowInto(

    GlobalWindows(),

    trigger=trigger.Repeatedly(trigger.AfterProcessingTime(10)),

    accumulation_mode=trigger.AccumulationMode.DISCARDING,

  )

  | "GroupByKey" >> beam.GroupByKey()

  | "Process group" >> beam.ParDo(ProcessGroup())

)

Thanks in advance,

Jess

Reply via email to