Rather than mutating the DoFn itself, I would create a new ParDo that wraps
and invokes the inner one, e.g.

class ErrorSieveParDo(beam.DoFn):
  def __init__(self, dofn):
    self._dofn = dofn
  def process(self, *args, **kwargs):
      # note that raw_result is an *iterable* or None
      raw_result = self._dofn.process(*args, **kwargs)
      ...
  # don't forget start/finish_bundle if needed

output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn()))

It's correct that you can't use this in Map/FlatMap. Alternatively, make a
transform

class ErrorSieve(PTransform):
  def __init__(self, pardo):
    self.fn = pardo.fn
  def expand(self, input_pcoll):
    return input_pcoll | ParDo(ErrorSieveParDo(self.fn))

output_pcoll = input_pcoll | ErrorSieve(SomeDoFn())

Even better, however, might be to do something like

class ErrorSieve(PTransform):
  def __init__(self, pardo):
    self.fn = pardo.fn
  def expand(self, pcoll_input):
    return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn)

Where AnotherDoFn filters/modifies the outputs (assuming it can be done
elementwise).


On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <dmi...@postmates.com>
wrote:

> Hi Sourabh,
>
> Great call, thanks. I was thinking about a slightly different interface,
> but this is exactly the direction I wanted to go. So, my approach, I guess,
> would be something like that:
>
> class ErrorSieve(PTransform):
>   def __init__(self, pardo):
>     self.fn = pardo.fn
>   def expand(self):
>     inner_process = self.fn.process
>     def process(self, *args, **kwargs):
>       raw_result = inner_process(*args, **kwargs)
>       result = ...
>       yield result
>     setattr(fn, 'process', process)
>     return ParDo(self.fn)
>
> ​
>
> I'll share a working snippet once it's done.
>
> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <sourabhba...@google.com>
> wrote:
>
>> Hi,
>>
>> Is it possible to create
>>
>> class ErrorSieve(PTransform):
>>    def __init__ (dofn):
>>    def expand():
>>          return ParDo(modifiedDoFn)
>>
>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>> don't expose the ParDo to the user.
>>
>> Will this work for your usecase?
>>
>> -Sourabh
>>
>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dmi...@postmates.com>
>> wrote:
>>
>>> Hi list,
>>>
>>> I'm trying to make a transformation function (let's call it ErrorSieve)
>>> that would take a ParDo object as input and modify its underlying DoFn
>>> object, basically adding extra logic on top of an underlying process()
>>> method.
>>>
>>> Ideally for me, the example usage would be:
>>>
>>> ```python
>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>
>>> or
>>>
>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>> ```
>>>
>>> However, this would require me to butcher the internals of ParDo
>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>> its transformation. My other thinking was to make it a fair and square DoFn:
>>>
>>> ```python
>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>> ```
>>>
>>> The only problem with this is that I can't use it with transforms like
>>> FlatMap, which is a bit unfortunate.
>>>
>>> Do you think it's worth investigating how to implement the first
>>> approach, or should I just instead settle with the second approach, using
>>> only custom DoFns?
>>>
>>> Thank you.
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Reply via email to