Thank you for your feeback !
I tried to make a simpler version of my problem here :
https://pastebin.com/sa1P2EEc (I added some print statements as I don't
really know how debugger friendly beam is)

I'm running on Ubuntu 20.04, Python 3.8.10 with beam 2.37.0 like this
(direct runner) : python -m my_file

I'm facing the following error when I run the command : AttributeError:
'apache_beam.runners.common.MethodWrapper' object has no attribute
'watermark_estimator_provider' [while running 'ParDo(Crawl)/pair']
If I delete the runners/*.so and runners/*.pxd in the python lib directory
the script starts properly but I get the following output :

> tag:seed_process
> tag:initial_restriction
> tag:create_tracker
> tag:init
> tag:check_done 28
> tag:process
>

which basically shows that check_done is called on my tracker (with 28
parts to process)  before the process method of my pardo.


On Fri, Mar 25, 2022 at 11:04 PM Pablo Estrada <[email protected]> wrote:

> Hi Julien!
> Are you able to share some of your code? Whenever I write any code, I try
> to write simple unit tests to verify my progress. If you have observed unit
> tests where the setup/teardown are not called, then that's a bug and we
> need to fix it.
> In general - how are you testing these changes? Locally? On a distributed
> runner?
>
> FWIW, the general architecture that you're showing looks fine. There
> should not be a problem with a pipeline that does what you show.
> Best
> -P.
>
> On Fri, Mar 25, 2022 at 9:58 AM Ahmet Altay <[email protected]> wrote:
>
>> /cc @Pablo Estrada <[email protected]> - might be able to help.
>>
>> On Fri, Mar 25, 2022 at 3:09 AM Julien Chaty-Capelle <
>> [email protected]> wrote:
>>
>>> Hello there,
>>> I'm actually working on moving parts of my company's  architectures
>>> toward something more cloud native. I ended up using beam as its quite
>>> straightforward to use with our current stack.
>>>
>>> I have some pipelines working at this time but I need to move some of
>>> them to SDF to increase parallelism.
>>> I tried to implement an SDF with a custom RestrictionTracker according
>>> to the doc here https://beam.apache.org/documentation/programming-guide/
>>> and code here
>>> https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/iobase.py
>>> but few things were surprising and some help would be welcomed.
>>>
>>>
>>>    1. First, I had to delete the compiled cython common.pdx file as it
>>>    was preventing beam from finding my restriction param (reinstalling
>>>    apache-beam though pip didn't solve the problem)
>>>    2. It looks like with SDF the setup and teardown methods of my DoFn
>>>    are never called
>>>    3. check_done is called on my restriction tracker before the process
>>>    method is called on my DoFn. This is weird as the documentation states 
>>> that
>>>    check_done should raise an error if there are some unclaimed work (no 
>>> work
>>>    is claimed if process hasn't been called yet)
>>>
>>>
>>> For the record I am using a GroupIntoBatches later on the pipeline,
>>> which looks like this :
>>>
>>>    - beam.Create (very small initial set)
>>>    - beam.ParDo (regular DoFn)
>>>    - beam.ParDo (SDF)
>>>    - beam.ParDo (regular DoFn)
>>>    - beam.GroupIntoBatches
>>>    - beam.ParDo (regular DoFn)
>>>
>>>
>>> Any help would be greatly appreciated
>>>
>>>
>>> --
>>> Julien CHATY-CAPELLE
>>>
>>> Full stack developer/Integrator
>>> Deepomatic
>>>
>>
  • Working with SDF Julien Chaty-Capelle
    • Re: Working with SDF Julien Chaty-Capelle

Reply via email to