I did a few experiments and am still a little confused about what’s
happening using either of the DirectRunner multiprocessing modes.

I’m running the following code:

import argparseimport sys
import apache_beam as beamfrom apache_beam.options.pipeline_options
import PipelineOptionsfrom apache_beam.transforms.core import
RestrictionProvider
class InfRestrictionProvider(RestrictionProvider):
    """
    An infinite restriction provider
    """

    def initial_restriction(self, element):
        from apache_beam.io.restriction_trackers import OffsetRange
        return OffsetRange(0, float('inf'))

    def create_tracker(self, restriction):
        from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
        return OffsetRestrictionTracker(restriction)

    def restriction_size(self, element, restriction):
        return 1
class _EventReadSDF(beam.DoFn):
    """
    An SDF for subscribing to custom events.
    """

    restriction_tracker = beam.DoFn.RestrictionParam(InfRestrictionProvider())

    def __init__(self):
        super(_FtrackEventReadSDF, self).__init__()
        # number of events received
        self._counter = 0

    @beam.DoFn.unbounded_per_element()
    def process(self, _, restriction_tracker=restriction_tracker):
        import random
        import time

        print('Attempting to claim {!r}'.format(self._counter))
        if not restriction_tracker.try_claim(self._counter):
            print('Failed to claim {!r}'.format(self._counter))
            return

        try:
            n = random.randrange(0, 5)
            time.sleep(n)
            yield n
        finally:
            self._counter += 1
            restriction_tracker.defer_remainder()
def main(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parser_known_args(argv)
    pipe = beam.Pipeline(options=PipelineOptions(pipeline_args))

    (
        pipe
        | beam.Impulse()
        | beam.ParDo(_EventReadSDF())
        | beam.Map(print)
    )

    result = pipe.run()
    result.wait_until_finish()
if __name__ == '__main__':
    main(sys.argv)

If I run this pipeline with more than one worker (e.g.
--direct_num_workers=2), then the SDF only produces one element. This seems
to be the case using either the direct_running_mode of multi_processing or
multi_threading. I tested this in python2 running beam-2.24.0 and python3
running beam-2.29.0.

Interestingly, if I run this with --direct_num_workers=1 it will generate
elements forever as intended. It would be great to understand what’s going
on a bit further. Is this the right technique for the RestrictionProvider?
Is there any other sample code that anyone has for doing something similar?

Thanks,
-Sam

On Wed, Mar 9, 2022 at 8:22 PM Ankur Goenka <[email protected]> wrote:

> This might be an issue with multi_processing implementation of
> DirectRunner.
>
> I experimented with the following code and got following error. You can
> try using --direct_running_mode='multi_threading'
>
> ----------------------
>  File
> "/usr/local/google/home/goenka/d/work/tmp/t1/venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1425, in process
>     initial_restriction = self.restriction_provider.initial_restriction(
>   File "/usr/local/google/home/goenka/d/work/tmp/t1/test.py", line 15, in
> initial_restriction
>     return OffsetRange(0, float('inf'))
> NameError: name 'OffsetRange' is not defined
>
>
> --------------------------
>
>
> import apache_beam as beam
> import sys
> import argparse
> from apache_beam.io.restriction_trackers import OffsetRange,
> OffsetRestrictionTracker
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.transforms.core import RestrictionProvider
>
>
> class InfRestrictionProvider(RestrictionProvider):
>     """
>     An infinite restriction provider
>     """
>
>     def initial_restriction(self, element):
>         return OffsetRange(0, float('inf'))
>
>     def create_tracker(self, restriction):
>         return OffsetRestrictionTracker(restriction)
>
>     def restriction_size(self, element, restriction):
>         return 1
>
> @beam.typehints.with_output_types(int)
> class _EventReadSDF(beam.DoFn):
>     """
>     An SDF for subscribing to custom events.
>     """
>
>     restriction_tracker =
> beam.DoFn.RestrictionParam(InfRestrictionProvider())
>
>     def __init__(self):
>         # type: () -> None
>         self._counter = 0
>
>     def process(self, _, restriction_tracker=restriction_tracker):
>         # type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]
>
>         if not restriction_tracker.try_claim(self._counter):
>             return
>
>         i = 0
>         for _ in range(100):
>           try:
>               # Blocks until the next event is received.
>               i+=1
>               yield i
>           finally:
>               self._counter += 1
>               restriction_tracker.defer_remainder()
>
>
> def run(argv=None):
>   parser = argparse.ArgumentParser()
>   known_args, pipeline_args = parser.parse_known_args(argv)
>
>   pipeline_options = PipelineOptions(pipeline_args)
>
>   with beam.Pipeline(options=pipeline_options) as pipe:
>     (
>       pipe
>       | beam.Impulse()
>       | beam.ParDo(_EventReadSDF())
>       | beam.Map(print)
>     )
>
> if __name__ == '__main__':
>   run(sys.argv)
> ~
>
>
>
>
> ~
>
>
>
>
> ~
>
>
>
>
> ~
>
>
>
>
> "test.py" 67L, 1778B written
>
>
>
>  42,26         All
>
> On Wed, Mar 9, 2022 at 4:15 PM Sam Bourne <[email protected]> wrote:
>
>> Thanks Ankur for the feedback.
>>
>> Does anyone have any examples they could share writing a SDF for an
>> unbound source (ideally in python)?
>>
>> The SDF I wrote looks something like this. It works fine using the
>> DirectRunner with the PipelineOption --direct_runner_mode='in_memory',
>> but doesn’t seem to operate properly using
>> --direct_runner_mode='multi_processing'. I’d like to run this in the
>> DataflowRunner, but I wanted to make sure the SDF was operating
>> correctly first.
>>
>> Any tips would be greatly appreciated!
>>
>> import apache_beam as beamfrom apache_beam.io.restriction_trackers import 
>> OffsetRange, OffsetRestrictionTrackerfrom apache_beam.transforms.core import 
>> RestrictionProvider
>> from myeventlibrary import Event, Subscriber
>> class InfRestrictionProvider(RestrictionProvider):
>>     """
>>     An infinite restriction provider
>>     """
>>
>>     def initial_restriction(self, element):
>>         return OffsetRange(0, float('inf'))
>>
>>     def create_tracker(self, restriction):
>>         return OffsetRestrictionTracker(restriction)
>>
>>     def restriction_size(self, element, restriction):
>>         return 1
>> @beam.typehints.with_output_types(Event)class _EventReadSDF(beam.DoFn):
>>     """
>>     An SDF for subscribing to custom events.
>>     """
>>
>>     restriction_tracker = 
>> beam.DoFn.RestrictionParam(InfRestrictionProvider())
>>
>>     def __init__(self):
>>         # type: () -> None
>>         super(_FtrackEventReadSDF, self).__init__()
>>         # number of events received
>>         self._counter = 0
>>
>>     def process(self, _, restriction_tracker=restriction_tracker):
>>         # type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]
>>
>>         if not restriction_tracker.try_claim(self._counter):
>>             return
>>
>>         subscriber = Subscriber()
>>         try:
>>             # Blocks until the next event is received.
>>             yield subscriber.get()
>>         finally:
>>             self._counter += 1
>>             restriction_tracker.defer_remainder()
>>
>>
>> On Wed, Mar 9, 2022 at 11:47 AM Ankur Goenka <[email protected]> wrote:
>>
>>> Hi Sam,
>>>
>>> SDF can reject split requests so a SDF can be made to run a single
>>> instance.
>>> DoFn.unbounded_per_element Let the Beam model know that this is an
>>> unbounded source. It also tries to infer it
>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L695
>>>
>>> As this will eventually run on a cluster, I would recommend going via
>>> the SDF route so that Watermarks and checkpoints can happen appropriately
>>> not just for this tranform but down stream tranforms.
>>>
>>> On Wed, Mar 9, 2022 at 10:54 AM Sam Bourne <[email protected]> wrote:
>>>
>>>> Hello! I’m looking for some help writing a custom transform that reads
>>>> from an unbound source. A basic version of this would look something like
>>>> this:
>>>>
>>>> import apache_beam as beam
>>>> import myeventlibrary
>>>> class _ReadEventsFn(beam.DoFn):
>>>>   def process(self, unused_element):
>>>>     subscriber = myeventlibrary.Subscriber()
>>>>     while True:
>>>>       # blocks until an event is received
>>>>       event = subscriber.get()
>>>>       yield event
>>>>
>>>> with beam.Pipeline() as pipe:
>>>>   (
>>>>     pipe
>>>>     | beam.Impulse()
>>>>     | beam.ParDo(_ReadEventsFn())
>>>>     | beam.Map(print)
>>>>   )
>>>>
>>>> I have a few questions:
>>>>
>>>>    - When executing this in the DirectRunner with the default number
>>>>    of workers (1), is it correct to assume the process would be constantly
>>>>    blocking and rarely processing other parts of the pipeline?
>>>>    - I noticed there is a decorator DoFn.unbounded_per_element which
>>>>    seems useful, but it’s a bit unclear what it does. I also noticed in the
>>>>    java docs it is an error to apply this decorator if the DoFn is not a 
>>>> SDF.
>>>>    - I took a stab at writing this as a SDF. The incoming events are
>>>>    not really something that can be read in parallel and they trickle in
>>>>    slowly. In this scenario is there anything to gain by using a SDF?
>>>>
>>>> SDF can reject the split request so an SDF can be made to run as a
>>> single instance.
>>>
>>>>
>>>>
>>>> Thanks!
>>>> -Sam
>>>>
>>>

Reply via email to