Thank you for the comment. I finally got this working. I would like to share my experience for people whom are beginner with portable runner. What I done was below items when calling functions and classes from external package.
1. As Kyle said, I needed 'save_main_session' for sys path to persist after pickling. 2. I needed to push all related files to worker nodes using "extra_package" option to resolve dependency. https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ 3. I needed to write import syntax in clear fashion otherwise I got below error in task manager. Looks like external packages is pushed in to "/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format to work it out. ================================================================================================ import utils ... File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py", line 18, in <module> import utils ImportError: No module named 'utils' ================================================================================================ Below is my import syntax for external package in main.py. Other files also follow below syntax. ================================================================================================ # # Local application/library specific imports # import pkg_aif.utils as ut from pkg_aif.beam_pardo import VerifyFn from pkg_aif.beam_pardo import FlattenTagFilesFn from pkg_aif.beam_states import StatefulBufferingFn from pkg_aif.pipeline_wrapper import pipelineWrapper from pkg_aif.frames import Frames from pkg_aif.tag_counts import TagCounts from pkg_aif.tags import Tags from pkg_aif.es_credentials import EsCredentials from pkg_aif.s3_credentials import S3Credentials ================================================================================================ Below are related information to above. Full options for PipelineOptions. ================================================================================================ options = PipelineOptions([ "--runner=PortableRunner", "--environment_config={0}".format(DOCKER_REGISTRY), "--environment_type=DOCKER", "--experiments=beam_fn_api", "--parallelism={0}".format(PARALLELISM), "--job_endpoint=localhost:8099", "--extra_package=PATH_TO_SDIST" ]) options.view_as(SetupOptions).save_main_session = True return beam.Pipeline(options=options) ================================================================================================ My setup.py is below. ================================================================================================ import setuptools REQUIRED_PACKAGES = [ 'apache-beam==2.15.0', 'elasticsearch>=7.0.0,<8.0.0', 'urllib3', 'boto3' ] setuptools.setup( author = 'Yu Watanabe', author_email = 'AUTHOR_EMAIL', url = 'URL', name = 'quality_validation', version = '0.1', install_requires = REQUIRED_PACKAGES, packages = setuptools.find_packages(), ) ================================================================================================ Directory path to setup.py. ================================================================================================ admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l total 20 drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 dist -rw-r--r-- 1 admin admin 0 Sep 5 21:21 __init__.py -rw-r--r-- 1 admin admin 3782 Oct 3 11:02 main.py drwxr-xr-x 3 admin admin 4096 Oct 3 15:41 pkg_aif drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 quality_validation.egg-info -rw-r--r-- 1 admin admin 517 Oct 1 15:21 setup.py ================================================================================================ Thanks, Yu Watanabe On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver <[email protected]> wrote: > Did you try moving the imports from the process function to the top of > main.py? > > Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] > > > On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <[email protected]> > wrote: > >> Hello. >> >> I would like to ask for help with resolving dependency issue for imported >> module. >> >> I have a directory structure as below and I am trying to import Frames >> class from frames.py to main.py. >> ========================================= >> quality-validation/ >> bin/setup.py >> main.py >> modules/ >> frames.py >> <TRIMMED> >> ========================================= >> >> However, when I run pipeline, I get below error at TaskManager. >> ========================================= >> <TRIMMED> >> File "apache_beam/runners/common.py", line 942, in >> apache_beam.runners.common._OutputProcessor.process_outputs >> File "apache_beam/runners/worker/operations.py", line 143, in >> apache_beam.runners.worker.operations.SingletonConsumerSet.receive >> File "apache_beam/runners/worker/operations.py", line 593, in >> apache_beam.runners.worker.operations.DoOperation.process >> File "apache_beam/runners/worker/operations.py", line 594, in >> apache_beam.runners.worker.operations.DoOperation.process >> File "apache_beam/runners/common.py", line 799, in >> apache_beam.runners.common.DoFnRunner.receive >> File "apache_beam/runners/common.py", line 805, in >> apache_beam.runners.common.DoFnRunner.process >> File "apache_beam/runners/common.py", line 872, in >> apache_beam.runners.common.DoFnRunner._reraise_augmented >> File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py", >> line 419, in raise_with_traceback >> raise exc.with_traceback(traceback) >> File "apache_beam/runners/common.py", line 803, in >> apache_beam.runners.common.DoFnRunner.process >> File "apache_beam/runners/common.py", line 465, in >> apache_beam.runners.common.SimpleInvoker.invoke_process >> File "apache_beam/runners/common.py", line 918, in >> apache_beam.runners.common._OutputProcessor.process_outputs >> File "/home/admin/quality-validation/bin/main.py", line 44, in process >> ImportError: No module named 'frames' [while running >> 'combined:flat/ParDo(FlattenTagFilesFn)'] >> ========================================= >> >> I import modules at global context and also at top of the process >> function . >> ========================================= >> [main.py] >> # >> # Standard library imports >> # >> import logging >> import pprint >> import sys >> sys.path.append("{0}/modules".format(sys.path[0])) >> sys.path.append("{0}/modules/vendor".format(sys.path[0])) >> >> # >> # Related third party imports >> # >> import apache_beam as beam >> >> # >> # Local application/library specific imports >> # >> import utils >> from pipeline_wrapper import pipelineWrapper >> from tag_counts import TagCounts >> from tags import Tags >> >> <TRIMMED> >> >> class FlattenTagFilesFn(beam.DoFn): >> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >> beam.DoFn.__init__(self) >> >> self.s3Bucket = s3Bucket >> self.s3Creds = s3Creds >> self.maxKeys = maxKeys >> >> def process(self, elem): >> import yaml >> from frames import Frames >> >> if not hasattr(self, 's3Client'): >> import boto3 >> self.s3Client = boto3.client('s3', >> aws_access_key_id=self.s3Creds[0], >> aws_secret_access_key=self.s3Creds[1]) >> >> (key, info) = elem >> >> preFrm = {} >> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket, >> Key=info['pre'][0][0]) >> yaml1 = yaml.load(resp1['Body']) >> >> for elem in yaml1['body']: >> preFrm[ elem['frame_tag']['frame_no'] ] = elem >> >> postFrm = {} >> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket, >> Key=info['post'][0][0]) >> yaml2 = yaml.load(resp2['Body']) >> >> for elem in yaml2['body']: >> postFrm[ elem['frame_tag']['frame_no'] ] = elem >> >> commonFrmNums = >> set(list(preFrm.keys())).intersection(list(postFrm.keys())) >> >> for f in commonFrmNums: >> frames = Frames( >> self.s3Bucket, >> info['pre'][0][0], # Pre S3Key >> info['post'][0][0], # Post S3Key >> yaml1['head']['operator_id'], # Pre OperatorId >> yaml2['head']['operator_id'], # Post OperatorId >> preFrm[f], # Pre Frame Line >> postFrm[f], # Post Frame Line >> info['pre'][0][1], # Pre Last >> Modified Time >> info['post'][0][1]) # Post Last >> Modified Time >> >> yield (frames) >> >> tagCounts = TagCounts( >> self.s3Bucket, >> yaml1, # Pre Yaml >> yaml2, # Post Yaml >> info['pre'][0][0], # Pre S3Key >> info['post'][0][0], # Post S3Key >> info['pre'][0][1], # Pre Last Modified Time >> info['post'][0][1] ) # Post Last Modified Time >> >> yield beam.pvalue.TaggedOutput('counts', tagCounts) >> ========================================= >> >> My pipeline options are below. I tried with and without "setup_file" but >> made no difference. >> ========================================= >> options = PipelineOptions([ >> "--runner=PortableRunner", >> >> "--environment_config={0}".format(self.__docker_registry), >> "--environment_type=DOCKER", >> "--experiments=beam_fn_api", >> "--job_endpoint=localhost:8099" >> ]) >> options.view_as(SetupOptions).save_main_session = True >> options.view_as(SetupOptions).setup_file = >> '/home/admin/quality-validation/bin/setup.py' >> ========================================= >> >> Is it possible to solve dependency in ParDo linked to external module >> when using Apache Flink? >> >> Thanks, >> Yu Watanabe >> >> -- >> Yu Watanabe >> Weekend Freelancer who loves to challenge building data platform >> [email protected] >> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >> Twitter icon] <https://twitter.com/yuwtennis> >> > -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform [email protected] [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
