Rather than mutating the DoFn itself, I would create a new ParDo that wraps and invokes the inner one, e.g.
class ErrorSieveParDo(beam.DoFn): def __init__(self, dofn): self._dofn = dofn def process(self, *args, **kwargs): # note that raw_result is an *iterable* or None raw_result = self._dofn.process(*args, **kwargs) ... # don't forget start/finish_bundle if needed output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn())) It's correct that you can't use this in Map/FlatMap. Alternatively, make a transform class ErrorSieve(PTransform): def __init__(self, pardo): self.fn = pardo.fn def expand(self, input_pcoll): return input_pcoll | ParDo(ErrorSieveParDo(self.fn)) output_pcoll = input_pcoll | ErrorSieve(SomeDoFn()) Even better, however, might be to do something like class ErrorSieve(PTransform): def __init__(self, pardo): self.fn = pardo.fn def expand(self, pcoll_input): return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn) Where AnotherDoFn filters/modifies the outputs (assuming it can be done elementwise). On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <dmi...@postmates.com> wrote: > Hi Sourabh, > > Great call, thanks. I was thinking about a slightly different interface, > but this is exactly the direction I wanted to go. So, my approach, I guess, > would be something like that: > > class ErrorSieve(PTransform): > def __init__(self, pardo): > self.fn = pardo.fn > def expand(self): > inner_process = self.fn.process > def process(self, *args, **kwargs): > raw_result = inner_process(*args, **kwargs) > result = ... > yield result > setattr(fn, 'process', process) > return ParDo(self.fn) > > > > I'll share a working snippet once it's done. > > On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <sourabhba...@google.com> > wrote: > >> Hi, >> >> Is it possible to create >> >> class ErrorSieve(PTransform): >> def __init__ (dofn): >> def expand(): >> return ParDo(modifiedDoFn) >> >> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you >> don't expose the ParDo to the user. >> >> Will this work for your usecase? >> >> -Sourabh >> >> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dmi...@postmates.com> >> wrote: >> >>> Hi list, >>> >>> I'm trying to make a transformation function (let's call it ErrorSieve) >>> that would take a ParDo object as input and modify its underlying DoFn >>> object, basically adding extra logic on top of an underlying process() >>> method. >>> >>> Ideally for me, the example usage would be: >>> >>> ```python >>> p | ErrorSieve(beam.ParDo(MyDoFn()) >>> >>> or >>> >>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1)) >>> ``` >>> >>> However, this would require me to butcher the internals of ParDo >>> mechanisms, especially since ParDo's make_fn() method gets called during >>> its transformation. My other thinking was to make it a fair and square DoFn: >>> >>> ```python >>> p | beam.ParDo(ErrorSieve(MyDoFn()) >>> ``` >>> >>> The only problem with this is that I can't use it with transforms like >>> FlatMap, which is a bit unfortunate. >>> >>> Do you think it's worth investigating how to implement the first >>> approach, or should I just instead settle with the second approach, using >>> only custom DoFns? >>> >>> Thank you. >>> >>> >>> -- >>> Best regards, >>> Dmitry Demeshchuk. >>> >> > > > -- > Best regards, > Dmitry Demeshchuk. >