Hello beam community! I’m having trouble coming up with the best pattern to *eagerly* poll. By eagerly, I mean that elements should be consumed and yielded as soon as possible. There are a handful of experiments that I’ve tried and my latest attempt using the timer API seems quite promising, but is operating in a way that I find rather unintuitive. My solution was to create a sort of recursive timer callback - which I found one example <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797> of within the beam test code.
I have a few questions: 1) The below code runs fine with a single worker but with multiple workers there are duplicate values. It seems that the callback and snapshot of the state is provided to multiple workers and the number of duplications increases with the number of workers. Is this due to the values being provided to timer.set? 2) I’m using TimeDomain.WATERMARK here due to it simply not working when using REAL_TIME. The docs <https://beam.apache.org/documentation/programming-guide/#state-and-timers> seem to suggest REAL_TIME would be the way to do this, however there seems to be no guarantee that a REAL_TIME callback will run. In this sample setting the timer to REAL_TIME will simply not ever fire the callback. Interestingly, if you call timer.set with any value less than the current time.time(), then the callback will run, however it seems to fire immediately regardless of the value (and in this sample will actually raise an AssertionError <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943> ). I’m happy for suggestions! -Sam import randomimport threading import apache_beam as beamimport apache_beam.coders as codersimport apache_beam.transforms.combiners as combinersimport apache_beam.transforms.userstate as userstateimport apache_beam.utils.timestamp as timestampfrom apache_beam.options.pipeline_options import PipelineOptions class Log(beam.PTransform): lock = threading.Lock() @classmethod def _log(cls, element, label): with cls.lock: # This just colors the print in terminal print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element)) return element def expand(self, pcoll): return pcoll | beam.Map(self._log, self.label) class EagerProcess(beam.DoFn): BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder()) POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK) def process( self, element, buffer=beam.DoFn.StateParam(BUFFER_STATE), timer=beam.DoFn.TimerParam(POLL_TIMER), ): _, item = element for i in range(item): buffer.add(i) timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10)) @userstate.on_timer(POLL_TIMER) def flush( self, buffer=beam.DoFn.StateParam(BUFFER_STATE), timer=beam.DoFn.TimerParam(POLL_TIMER), ): cache = buffer.read() buffer.clear() requeue = False for item in cache: if random.random() < 0.1: yield item else: buffer.add(item) requeue = True if requeue: timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10)) def main(): options = PipelineOptions.from_dictionary({ 'direct_num_workers': 3, 'direct_running_mode': 'multi_threading', }) pipe = beam.Pipeline(options=options) ( pipe | beam.Create([10]) | 'Init' >> Log() | beam.Reify.Timestamp() | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x)) | beam.ParDo(EagerProcess()) | 'Complete' >> Log() | beam.transforms.combiners.Count.Globally() | 'Count' >> Log() ) result = pipe.run() result.wait_until_finish() if __name__ == '__main__': main()