I did a few experiments and am still a little confused about what’s
happening using either of the DirectRunner multiprocessing modes.
I’m running the following code:
import argparseimport sys
import apache_beam as beamfrom apache_beam.options.pipeline_options
import PipelineOptionsfrom apache_beam.transforms.core import
RestrictionProvider
class InfRestrictionProvider(RestrictionProvider):
"""
An infinite restriction provider
"""
def initial_restriction(self, element):
from apache_beam.io.restriction_trackers import OffsetRange
return OffsetRange(0, float('inf'))
def create_tracker(self, restriction):
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
return OffsetRestrictionTracker(restriction)
def restriction_size(self, element, restriction):
return 1
class _EventReadSDF(beam.DoFn):
"""
An SDF for subscribing to custom events.
"""
restriction_tracker = beam.DoFn.RestrictionParam(InfRestrictionProvider())
def __init__(self):
super(_FtrackEventReadSDF, self).__init__()
# number of events received
self._counter = 0
@beam.DoFn.unbounded_per_element()
def process(self, _, restriction_tracker=restriction_tracker):
import random
import time
print('Attempting to claim {!r}'.format(self._counter))
if not restriction_tracker.try_claim(self._counter):
print('Failed to claim {!r}'.format(self._counter))
return
try:
n = random.randrange(0, 5)
time.sleep(n)
yield n
finally:
self._counter += 1
restriction_tracker.defer_remainder()
def main(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parser_known_args(argv)
pipe = beam.Pipeline(options=PipelineOptions(pipeline_args))
(
pipe
| beam.Impulse()
| beam.ParDo(_EventReadSDF())
| beam.Map(print)
)
result = pipe.run()
result.wait_until_finish()
if __name__ == '__main__':
main(sys.argv)
If I run this pipeline with more than one worker (e.g.
--direct_num_workers=2), then the SDF only produces one element. This seems
to be the case using either the direct_running_mode of multi_processing or
multi_threading. I tested this in python2 running beam-2.24.0 and python3
running beam-2.29.0.
Interestingly, if I run this with --direct_num_workers=1 it will generate
elements forever as intended. It would be great to understand what’s going
on a bit further. Is this the right technique for the RestrictionProvider?
Is there any other sample code that anyone has for doing something similar?
Thanks,
-Sam
On Wed, Mar 9, 2022 at 8:22 PM Ankur Goenka <[email protected]> wrote:
> This might be an issue with multi_processing implementation of
> DirectRunner.
>
> I experimented with the following code and got following error. You can
> try using --direct_running_mode='multi_threading'
>
> ----------------------
> File
> "/usr/local/google/home/goenka/d/work/tmp/t1/venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1425, in process
> initial_restriction = self.restriction_provider.initial_restriction(
> File "/usr/local/google/home/goenka/d/work/tmp/t1/test.py", line 15, in
> initial_restriction
> return OffsetRange(0, float('inf'))
> NameError: name 'OffsetRange' is not defined
>
>
> --------------------------
>
>
> import apache_beam as beam
> import sys
> import argparse
> from apache_beam.io.restriction_trackers import OffsetRange,
> OffsetRestrictionTracker
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.transforms.core import RestrictionProvider
>
>
> class InfRestrictionProvider(RestrictionProvider):
> """
> An infinite restriction provider
> """
>
> def initial_restriction(self, element):
> return OffsetRange(0, float('inf'))
>
> def create_tracker(self, restriction):
> return OffsetRestrictionTracker(restriction)
>
> def restriction_size(self, element, restriction):
> return 1
>
> @beam.typehints.with_output_types(int)
> class _EventReadSDF(beam.DoFn):
> """
> An SDF for subscribing to custom events.
> """
>
> restriction_tracker =
> beam.DoFn.RestrictionParam(InfRestrictionProvider())
>
> def __init__(self):
> # type: () -> None
> self._counter = 0
>
> def process(self, _, restriction_tracker=restriction_tracker):
> # type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]
>
> if not restriction_tracker.try_claim(self._counter):
> return
>
> i = 0
> for _ in range(100):
> try:
> # Blocks until the next event is received.
> i+=1
> yield i
> finally:
> self._counter += 1
> restriction_tracker.defer_remainder()
>
>
> def run(argv=None):
> parser = argparse.ArgumentParser()
> known_args, pipeline_args = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(pipeline_args)
>
> with beam.Pipeline(options=pipeline_options) as pipe:
> (
> pipe
> | beam.Impulse()
> | beam.ParDo(_EventReadSDF())
> | beam.Map(print)
> )
>
> if __name__ == '__main__':
> run(sys.argv)
> ~
>
>
>
>
> ~
>
>
>
>
> ~
>
>
>
>
> ~
>
>
>
>
> "test.py" 67L, 1778B written
>
>
>
> 42,26 All
>
> On Wed, Mar 9, 2022 at 4:15 PM Sam Bourne <[email protected]> wrote:
>
>> Thanks Ankur for the feedback.
>>
>> Does anyone have any examples they could share writing a SDF for an
>> unbound source (ideally in python)?
>>
>> The SDF I wrote looks something like this. It works fine using the
>> DirectRunner with the PipelineOption --direct_runner_mode='in_memory',
>> but doesn’t seem to operate properly using
>> --direct_runner_mode='multi_processing'. I’d like to run this in the
>> DataflowRunner, but I wanted to make sure the SDF was operating
>> correctly first.
>>
>> Any tips would be greatly appreciated!
>>
>> import apache_beam as beamfrom apache_beam.io.restriction_trackers import
>> OffsetRange, OffsetRestrictionTrackerfrom apache_beam.transforms.core import
>> RestrictionProvider
>> from myeventlibrary import Event, Subscriber
>> class InfRestrictionProvider(RestrictionProvider):
>> """
>> An infinite restriction provider
>> """
>>
>> def initial_restriction(self, element):
>> return OffsetRange(0, float('inf'))
>>
>> def create_tracker(self, restriction):
>> return OffsetRestrictionTracker(restriction)
>>
>> def restriction_size(self, element, restriction):
>> return 1
>> @beam.typehints.with_output_types(Event)class _EventReadSDF(beam.DoFn):
>> """
>> An SDF for subscribing to custom events.
>> """
>>
>> restriction_tracker =
>> beam.DoFn.RestrictionParam(InfRestrictionProvider())
>>
>> def __init__(self):
>> # type: () -> None
>> super(_FtrackEventReadSDF, self).__init__()
>> # number of events received
>> self._counter = 0
>>
>> def process(self, _, restriction_tracker=restriction_tracker):
>> # type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]
>>
>> if not restriction_tracker.try_claim(self._counter):
>> return
>>
>> subscriber = Subscriber()
>> try:
>> # Blocks until the next event is received.
>> yield subscriber.get()
>> finally:
>> self._counter += 1
>> restriction_tracker.defer_remainder()
>>
>>
>> On Wed, Mar 9, 2022 at 11:47 AM Ankur Goenka <[email protected]> wrote:
>>
>>> Hi Sam,
>>>
>>> SDF can reject split requests so a SDF can be made to run a single
>>> instance.
>>> DoFn.unbounded_per_element Let the Beam model know that this is an
>>> unbounded source. It also tries to infer it
>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L695
>>>
>>> As this will eventually run on a cluster, I would recommend going via
>>> the SDF route so that Watermarks and checkpoints can happen appropriately
>>> not just for this tranform but down stream tranforms.
>>>
>>> On Wed, Mar 9, 2022 at 10:54 AM Sam Bourne <[email protected]> wrote:
>>>
>>>> Hello! I’m looking for some help writing a custom transform that reads
>>>> from an unbound source. A basic version of this would look something like
>>>> this:
>>>>
>>>> import apache_beam as beam
>>>> import myeventlibrary
>>>> class _ReadEventsFn(beam.DoFn):
>>>> def process(self, unused_element):
>>>> subscriber = myeventlibrary.Subscriber()
>>>> while True:
>>>> # blocks until an event is received
>>>> event = subscriber.get()
>>>> yield event
>>>>
>>>> with beam.Pipeline() as pipe:
>>>> (
>>>> pipe
>>>> | beam.Impulse()
>>>> | beam.ParDo(_ReadEventsFn())
>>>> | beam.Map(print)
>>>> )
>>>>
>>>> I have a few questions:
>>>>
>>>> - When executing this in the DirectRunner with the default number
>>>> of workers (1), is it correct to assume the process would be constantly
>>>> blocking and rarely processing other parts of the pipeline?
>>>> - I noticed there is a decorator DoFn.unbounded_per_element which
>>>> seems useful, but it’s a bit unclear what it does. I also noticed in the
>>>> java docs it is an error to apply this decorator if the DoFn is not a
>>>> SDF.
>>>> - I took a stab at writing this as a SDF. The incoming events are
>>>> not really something that can be read in parallel and they trickle in
>>>> slowly. In this scenario is there anything to gain by using a SDF?
>>>>
>>>> SDF can reject the split request so an SDF can be made to run as a
>>> single instance.
>>>
>>>>
>>>>
>>>> Thanks!
>>>> -Sam
>>>>
>>>