Dear Beam users, I am trying to write a pipeline that would use a BigQuery table as sideinput, but I want beam to refresh the content in memory periodically. For this, I am using the recommended periodic impulse and then a read from bigquery. I have defined the following class for this:
class BigQuerySideinput(beam.PTransform): > def __init__(self, query, refresh_interval): > super(BigQuerySideinput, self).__init__() > self.query = query > self.refresh_interval = refresh_interval > def expand(self, pcoll): > return ( pcoll > | 'periodic_impulse' >> > beam.transforms.periodicsequence.PeriodicImpulse(fire_interval=self.refresh_interval, > apply_windowing=True) > | 'inject_query' >> beam.Map(lambda x: self.query) > | 'run_query' >> beam.ParDo(BigQueryRead())) Here BigQueryRead() is a DoFn that takes the query and reads the data. When I try to run a pipeline with this step in it I get the following error: > RuntimeError: Transform node > AppliedPTransform(EmployeeLookup/Employee:Collect/periodic_impulse/GenSequence/Proc > essKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced > as expected. Could anyone help with any idea what might cause the issue and how to fix it? Also if there is a better way to get the same behavior I am also up for those suggestions. Thanks in advance. (If more details are needed I would be happy to provide them.) Best regards, Matyas Manninger
