Thanks Ankur for the feedback.
Does anyone have any examples they could share writing a SDF for an unbound
source (ideally in python)?
The SDF I wrote looks something like this. It works fine using the
DirectRunner with the PipelineOption --direct_runner_mode='in_memory', but
doesn’t seem to operate properly using
--direct_runner_mode='multi_processing'. I’d like to run this in the
DataflowRunner, but I wanted to make sure the SDF was operating correctly
first.
Any tips would be greatly appreciated!
import apache_beam as beamfrom apache_beam.io.restriction_trackers
import OffsetRange, OffsetRestrictionTrackerfrom
apache_beam.transforms.core import RestrictionProvider
from myeventlibrary import Event, Subscriber
class InfRestrictionProvider(RestrictionProvider):
"""
An infinite restriction provider
"""
def initial_restriction(self, element):
return OffsetRange(0, float('inf'))
def create_tracker(self, restriction):
return OffsetRestrictionTracker(restriction)
def restriction_size(self, element, restriction):
return 1
@beam.typehints.with_output_types(Event)class _EventReadSDF(beam.DoFn):
"""
An SDF for subscribing to custom events.
"""
restriction_tracker = beam.DoFn.RestrictionParam(InfRestrictionProvider())
def __init__(self):
# type: () -> None
super(_FtrackEventReadSDF, self).__init__()
# number of events received
self._counter = 0
def process(self, _, restriction_tracker=restriction_tracker):
# type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]
if not restriction_tracker.try_claim(self._counter):
return
subscriber = Subscriber()
try:
# Blocks until the next event is received.
yield subscriber.get()
finally:
self._counter += 1
restriction_tracker.defer_remainder()
On Wed, Mar 9, 2022 at 11:47 AM Ankur Goenka <[email protected]> wrote:
> Hi Sam,
>
> SDF can reject split requests so a SDF can be made to run a single
> instance.
> DoFn.unbounded_per_element Let the Beam model know that this is an
> unbounded source. It also tries to infer it
> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L695
>
> As this will eventually run on a cluster, I would recommend going via the
> SDF route so that Watermarks and checkpoints can happen appropriately not
> just for this tranform but down stream tranforms.
>
> On Wed, Mar 9, 2022 at 10:54 AM Sam Bourne <[email protected]> wrote:
>
>> Hello! I’m looking for some help writing a custom transform that reads
>> from an unbound source. A basic version of this would look something like
>> this:
>>
>> import apache_beam as beam
>> import myeventlibrary
>> class _ReadEventsFn(beam.DoFn):
>> def process(self, unused_element):
>> subscriber = myeventlibrary.Subscriber()
>> while True:
>> # blocks until an event is received
>> event = subscriber.get()
>> yield event
>>
>> with beam.Pipeline() as pipe:
>> (
>> pipe
>> | beam.Impulse()
>> | beam.ParDo(_ReadEventsFn())
>> | beam.Map(print)
>> )
>>
>> I have a few questions:
>>
>> - When executing this in the DirectRunner with the default number of
>> workers (1), is it correct to assume the process would be constantly
>> blocking and rarely processing other parts of the pipeline?
>> - I noticed there is a decorator DoFn.unbounded_per_element which
>> seems useful, but it’s a bit unclear what it does. I also noticed in the
>> java docs it is an error to apply this decorator if the DoFn is not a SDF.
>> - I took a stab at writing this as a SDF. The incoming events are not
>> really something that can be read in parallel and they trickle in slowly.
>> In this scenario is there anything to gain by using a SDF?
>>
>> SDF can reject the split request so an SDF can be made to run as a single
> instance.
>
>>
>>
>> Thanks!
>> -Sam
>>
>