Thank you for your feeback ! I tried to make a simpler version of my problem here : https://pastebin.com/sa1P2EEc (I added some print statements as I don't really know how debugger friendly beam is)
I'm running on Ubuntu 20.04, Python 3.8.10 with beam 2.37.0 like this (direct runner) : python -m my_file I'm facing the following error when I run the command : AttributeError: 'apache_beam.runners.common.MethodWrapper' object has no attribute 'watermark_estimator_provider' [while running 'ParDo(Crawl)/pair'] If I delete the runners/*.so and runners/*.pxd in the python lib directory the script starts properly but I get the following output : > tag:seed_process > tag:initial_restriction > tag:create_tracker > tag:init > tag:check_done 28 > tag:process > which basically shows that check_done is called on my tracker (with 28 parts to process) before the process method of my pardo. On Fri, Mar 25, 2022 at 11:04 PM Pablo Estrada <[email protected]> wrote: > Hi Julien! > Are you able to share some of your code? Whenever I write any code, I try > to write simple unit tests to verify my progress. If you have observed unit > tests where the setup/teardown are not called, then that's a bug and we > need to fix it. > In general - how are you testing these changes? Locally? On a distributed > runner? > > FWIW, the general architecture that you're showing looks fine. There > should not be a problem with a pipeline that does what you show. > Best > -P. > > On Fri, Mar 25, 2022 at 9:58 AM Ahmet Altay <[email protected]> wrote: > >> /cc @Pablo Estrada <[email protected]> - might be able to help. >> >> On Fri, Mar 25, 2022 at 3:09 AM Julien Chaty-Capelle < >> [email protected]> wrote: >> >>> Hello there, >>> I'm actually working on moving parts of my company's architectures >>> toward something more cloud native. I ended up using beam as its quite >>> straightforward to use with our current stack. >>> >>> I have some pipelines working at this time but I need to move some of >>> them to SDF to increase parallelism. >>> I tried to implement an SDF with a custom RestrictionTracker according >>> to the doc here https://beam.apache.org/documentation/programming-guide/ >>> and code here >>> https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/iobase.py >>> but few things were surprising and some help would be welcomed. >>> >>> >>> 1. First, I had to delete the compiled cython common.pdx file as it >>> was preventing beam from finding my restriction param (reinstalling >>> apache-beam though pip didn't solve the problem) >>> 2. It looks like with SDF the setup and teardown methods of my DoFn >>> are never called >>> 3. check_done is called on my restriction tracker before the process >>> method is called on my DoFn. This is weird as the documentation states >>> that >>> check_done should raise an error if there are some unclaimed work (no >>> work >>> is claimed if process hasn't been called yet) >>> >>> >>> For the record I am using a GroupIntoBatches later on the pipeline, >>> which looks like this : >>> >>> - beam.Create (very small initial set) >>> - beam.ParDo (regular DoFn) >>> - beam.ParDo (SDF) >>> - beam.ParDo (regular DoFn) >>> - beam.GroupIntoBatches >>> - beam.ParDo (regular DoFn) >>> >>> >>> Any help would be greatly appreciated >>> >>> >>> -- >>> Julien CHATY-CAPELLE >>> >>> Full stack developer/Integrator >>> Deepomatic >>> >>
