In our beam pipeline we are writing out the results (to MySQL) in separate
(parallel) stages. When all stages finished writing successfully, we want
to set a "final" flag on all DB tables (so, any client who uses this data
will always use the latest data, but only if it was completely processed).
I added the following to our MySQL write stage:
def finish_bundle(self):
self._process(1)
yield beam.utils.windowed_value.WindowedValue(
value=f'Finished writing pcollection',
timestamp=beam.utils.timestamp.MIN_TIMESTAMP,
windows=[beam.transforms.window.GlobalWindow()],
)
So, that they all yield this one value when they are done.
I collect all these results in a set and then do:
_ = (final_flags_set
| "FlattenFinalFlags" >> beam.Flatten()
| "CountFinalFlags" >> beam.combiners.Count.Globally()
| "SetFinalFlag" >> beam.ParDo(self.SetFinalDoFn(env,
self.TableName()), insertion_timestamp))
The idea is that for the Count.Globally() the pipeline will need to wait
until all stages are done before the SetFinalFlag stage is started.
This works, but it adds several hours to our data pipeline (running on GCP
Dataflow). Without this, the pipeline finishes in 3-4 hours. With this, it
finishes in 6-7 hours.
a) is there any better way to do this?
b) any idea what is going on here? And how I could analyse/debug this?
Thanks
Mark