Hello beam community!

I’m having trouble coming up with the best pattern to *eagerly* poll. By
eagerly, I mean that elements should be consumed and yielded as soon as
possible. There are a handful of experiments that I’ve tried and my latest
attempt using the timer API seems quite promising, but is operating in a
way that I find rather unintuitive. My solution was to create a sort of
recursive timer callback - which I found one example
<https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
of within the beam test code.

I have a few questions:

1) The below code runs fine with a single worker but with multiple workers
there are duplicate values. It seems that the callback and snapshot of the
state is provided to multiple workers and the number of duplications
increases with the number of workers. Is this due to the values being
provided to timer.set?

2) I’m using TimeDomain.WATERMARK here due to it simply not working when
using REAL_TIME. The docs
<https://beam.apache.org/documentation/programming-guide/#state-and-timers>
seem to suggest REAL_TIME would be the way to do this, however there seems
to be no guarantee that a REAL_TIME callback will run. In this sample
setting the timer to REAL_TIME will simply not ever fire the callback.
Interestingly, if you call timer.set with any value less than the current
time.time(), then the callback will run, however it seems to fire
immediately regardless of the value (and in this sample will actually raise
an AssertionError
<https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
).

I’m happy for suggestions!
-Sam

import randomimport threading
import apache_beam as beamimport apache_beam.coders as codersimport
apache_beam.transforms.combiners as combinersimport
apache_beam.transforms.userstate as userstateimport
apache_beam.utils.timestamp as timestampfrom
apache_beam.options.pipeline_options import PipelineOptions
class Log(beam.PTransform):

    lock = threading.Lock()

    @classmethod
    def _log(cls, element, label):
        with cls.lock:
            # This just colors the print in terminal
            print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
        return element

    def expand(self, pcoll):
        return pcoll | beam.Map(self._log, self.label)
class EagerProcess(beam.DoFn):

    BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
    POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)

    def process(
            self,
            element,
            buffer=beam.DoFn.StateParam(BUFFER_STATE),
            timer=beam.DoFn.TimerParam(POLL_TIMER),
    ):
        _, item = element

        for i in range(item):
            buffer.add(i)

        timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))

    @userstate.on_timer(POLL_TIMER)
    def flush(
            self,
            buffer=beam.DoFn.StateParam(BUFFER_STATE),
            timer=beam.DoFn.TimerParam(POLL_TIMER),
    ):
        cache = buffer.read()
        buffer.clear()

        requeue = False
        for item in cache:
            if random.random() < 0.1:
                yield item
            else:
                buffer.add(item)
                requeue = True

        if requeue:
            timer.set(timestamp.Timestamp.now() +
timestamp.Duration(seconds=10))
def main():
    options = PipelineOptions.from_dictionary({
        'direct_num_workers': 3,
        'direct_running_mode': 'multi_threading',
    })

    pipe = beam.Pipeline(options=options)
    (
        pipe
        | beam.Create([10])
        | 'Init' >> Log()
        | beam.Reify.Timestamp()
        | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
        | beam.ParDo(EagerProcess())
        | 'Complete' >> Log()
        | beam.transforms.combiners.Count.Globally()
        | 'Count' >> Log()
    )
    result = pipe.run()
    result.wait_until_finish()
if __name__ == '__main__':
    main()

Reply via email to