Thank you for the comment.

I finally got this working. I would like to share my experience for people
whom are beginner with portable runner.
What I done was below items when calling functions and classes from
external package.

1. As Kyle said, I needed 'save_main_session' for sys path to persist after
pickling.

2. I needed to push all related files to worker nodes using "extra_package"
option to resolve dependency.
https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

3. I needed to write import syntax in clear fashion otherwise I got below
error in task manager.
Looks like external packages is pushed in to
"/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format
to work it out.
================================================================================================
import utils

...

  File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py", line
18, in <module>
    import utils
ImportError: No module named 'utils'
================================================================================================

Below is my import syntax for external package in main.py. Other files also
follow below syntax.
================================================================================================
#
# Local application/library specific imports
#
import pkg_aif.utils as ut
from pkg_aif.beam_pardo import VerifyFn
from pkg_aif.beam_pardo import FlattenTagFilesFn
from pkg_aif.beam_states import StatefulBufferingFn
from pkg_aif.pipeline_wrapper import pipelineWrapper
from pkg_aif.frames import Frames
from pkg_aif.tag_counts import TagCounts
from pkg_aif.tags import Tags
from pkg_aif.es_credentials import EsCredentials
from pkg_aif.s3_credentials import S3Credentials
================================================================================================

Below are related information to above.

Full options for PipelineOptions.
================================================================================================
        options = PipelineOptions([
                      "--runner=PortableRunner",
                      "--environment_config={0}".format(DOCKER_REGISTRY),
                      "--environment_type=DOCKER",
                      "--experiments=beam_fn_api",
                      "--parallelism={0}".format(PARALLELISM),
                      "--job_endpoint=localhost:8099",
                      "--extra_package=PATH_TO_SDIST"
                  ])
        options.view_as(SetupOptions).save_main_session = True

        return beam.Pipeline(options=options)
================================================================================================

My setup.py is below.
================================================================================================
import setuptools

REQUIRED_PACKAGES = [
            'apache-beam==2.15.0',
            'elasticsearch>=7.0.0,<8.0.0',
            'urllib3',
            'boto3'
                ]

setuptools.setup(
   author       = 'Yu Watanabe',
   author_email = 'AUTHOR_EMAIL',
   url          = 'URL',
   name         = 'quality_validation',
   version      = '0.1',
   install_requires = REQUIRED_PACKAGES,
   packages     = setuptools.find_packages(),

)
================================================================================================

Directory path to setup.py.
================================================================================================
admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l
total 20
drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 dist
-rw-r--r-- 1 admin admin    0 Sep  5 21:21 __init__.py
-rw-r--r-- 1 admin admin 3782 Oct  3 11:02 main.py
drwxr-xr-x 3 admin admin 4096 Oct  3 15:41 pkg_aif
drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 quality_validation.egg-info
-rw-r--r-- 1 admin admin  517 Oct  1 15:21 setup.py
================================================================================================

Thanks,
Yu Watanabe

On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver <[email protected]> wrote:

> Did you try moving the imports from the process function to the top of
> main.py?
>
> Kyle Weaver | Software Engineer | github.com/ibzib | [email protected]
>
>
> On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <[email protected]>
> wrote:
>
>> Hello.
>>
>> I would like to ask for help with resolving dependency issue for imported
>> module.
>>
>> I have a directory structure as below and I am trying to import Frames
>> class from frames.py to main.py.
>> =========================================
>> quality-validation/
>>     bin/setup.py
>>           main.py
>>           modules/
>>             frames.py
>>            <TRIMMED>
>> =========================================
>>
>> However, when I run pipeline, I get below error at TaskManager.
>> =========================================
>> <TRIMMED>
>>   File "apache_beam/runners/common.py", line 942, in
>> apache_beam.runners.common._OutputProcessor.process_outputs
>>   File "apache_beam/runners/worker/operations.py", line 143, in
>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>   File "apache_beam/runners/worker/operations.py", line 593, in
>> apache_beam.runners.worker.operations.DoOperation.process
>>   File "apache_beam/runners/worker/operations.py", line 594, in
>> apache_beam.runners.worker.operations.DoOperation.process
>>   File "apache_beam/runners/common.py", line 799, in
>> apache_beam.runners.common.DoFnRunner.receive
>>   File "apache_beam/runners/common.py", line 805, in
>> apache_beam.runners.common.DoFnRunner.process
>>   File "apache_beam/runners/common.py", line 872, in
>> apache_beam.runners.common.DoFnRunner._reraise_augmented
>>   File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py",
>> line 419, in raise_with_traceback
>>     raise exc.with_traceback(traceback)
>>   File "apache_beam/runners/common.py", line 803, in
>> apache_beam.runners.common.DoFnRunner.process
>>   File "apache_beam/runners/common.py", line 465, in
>> apache_beam.runners.common.SimpleInvoker.invoke_process
>>   File "apache_beam/runners/common.py", line 918, in
>> apache_beam.runners.common._OutputProcessor.process_outputs
>>   File "/home/admin/quality-validation/bin/main.py", line 44, in process
>> ImportError: No module named 'frames' [while running
>> 'combined:flat/ParDo(FlattenTagFilesFn)']
>> =========================================
>>
>> I  import modules at global context  and also at top of the process
>> function .
>> =========================================
>> [main.py]
>> #
>> # Standard library imports
>> #
>> import logging
>> import pprint
>> import sys
>> sys.path.append("{0}/modules".format(sys.path[0]))
>> sys.path.append("{0}/modules/vendor".format(sys.path[0]))
>>
>> #
>> # Related third party imports
>> #
>> import apache_beam as beam
>>
>> #
>> # Local application/library specific imports
>> #
>> import utils
>> from pipeline_wrapper import pipelineWrapper
>> from tag_counts import TagCounts
>> from tags import Tags
>>
>> <TRIMMED>
>>
>> class FlattenTagFilesFn(beam.DoFn):
>>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>         beam.DoFn.__init__(self)
>>
>>         self.s3Bucket = s3Bucket
>>         self.s3Creds  = s3Creds
>>         self.maxKeys  = maxKeys
>>
>>     def process(self, elem):
>>         import yaml
>>         from frames import Frames
>>
>>         if not hasattr(self, 's3Client'):
>>             import boto3
>>             self.s3Client = boto3.client('s3',
>>                                 aws_access_key_id=self.s3Creds[0],
>>                                 aws_secret_access_key=self.s3Creds[1])
>>
>>         (key, info) = elem
>>
>>         preFrm = {}
>>         resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>> Key=info['pre'][0][0])
>>         yaml1 = yaml.load(resp1['Body'])
>>
>>         for elem in yaml1['body']:
>>             preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>
>>         postFrm = {}
>>         resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>> Key=info['post'][0][0])
>>         yaml2 = yaml.load(resp2['Body'])
>>
>>         for elem in yaml2['body']:
>>             postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>
>>         commonFrmNums =
>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>
>>         for f in commonFrmNums:
>>             frames = Frames(
>>                           self.s3Bucket,
>>                           info['pre'][0][0],            # Pre S3Key
>>                           info['post'][0][0],           # Post S3Key
>>                           yaml1['head']['operator_id'], # Pre OperatorId
>>                           yaml2['head']['operator_id'], # Post OperatorId
>>                           preFrm[f],                    # Pre Frame Line
>>                           postFrm[f],                   # Post Frame Line
>>                           info['pre'][0][1],            # Pre Last
>> Modified Time
>>                           info['post'][0][1])           # Post Last
>> Modified Time
>>
>>             yield (frames)
>>
>>         tagCounts = TagCounts(
>>                          self.s3Bucket,
>>                          yaml1,               # Pre Yaml
>>                          yaml2,               # Post Yaml
>>                          info['pre'][0][0],   # Pre S3Key
>>                          info['post'][0][0],  # Post S3Key
>>                          info['pre'][0][1],   # Pre Last Modified Time
>>                          info['post'][0][1] ) # Post Last Modified Time
>>
>>         yield beam.pvalue.TaggedOutput('counts', tagCounts)
>> =========================================
>>
>> My pipeline options are below. I tried with and without "setup_file" but
>> made no difference.
>> =========================================
>>         options = PipelineOptions([
>>                       "--runner=PortableRunner",
>>
>> "--environment_config={0}".format(self.__docker_registry),
>>                       "--environment_type=DOCKER",
>>                       "--experiments=beam_fn_api",
>>                       "--job_endpoint=localhost:8099"
>>                   ])
>>         options.view_as(SetupOptions).save_main_session = True
>>         options.view_as(SetupOptions).setup_file =
>> '/home/admin/quality-validation/bin/setup.py'
>> =========================================
>>
>> Is it possible to solve dependency in ParDo linked to external module
>> when using Apache Flink?
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> [email protected]
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[email protected]
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Reply via email to