Did you try moving the imports from the process function to the top of
main.py?

Kyle Weaver | Software Engineer | github.com/ibzib | [email protected]


On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <[email protected]> wrote:

> Hello.
>
> I would like to ask for help with resolving dependency issue for imported
> module.
>
> I have a directory structure as below and I am trying to import Frames
> class from frames.py to main.py.
> =========================================
> quality-validation/
>     bin/setup.py
>           main.py
>           modules/
>             frames.py
>            <TRIMMED>
> =========================================
>
> However, when I run pipeline, I get below error at TaskManager.
> =========================================
> <TRIMMED>
>   File "apache_beam/runners/common.py", line 942, in
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 143, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 593, in
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 594, in
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 799, in
> apache_beam.runners.common.DoFnRunner.receive
>   File "apache_beam/runners/common.py", line 805, in
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 872, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py",
> line 419, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 803, in
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 465, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 918, in
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/home/admin/quality-validation/bin/main.py", line 44, in process
> ImportError: No module named 'frames' [while running
> 'combined:flat/ParDo(FlattenTagFilesFn)']
> =========================================
>
> I  import modules at global context  and also at top of the process
> function .
> =========================================
> [main.py]
> #
> # Standard library imports
> #
> import logging
> import pprint
> import sys
> sys.path.append("{0}/modules".format(sys.path[0]))
> sys.path.append("{0}/modules/vendor".format(sys.path[0]))
>
> #
> # Related third party imports
> #
> import apache_beam as beam
>
> #
> # Local application/library specific imports
> #
> import utils
> from pipeline_wrapper import pipelineWrapper
> from tag_counts import TagCounts
> from tags import Tags
>
> <TRIMMED>
>
> class FlattenTagFilesFn(beam.DoFn):
>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>         beam.DoFn.__init__(self)
>
>         self.s3Bucket = s3Bucket
>         self.s3Creds  = s3Creds
>         self.maxKeys  = maxKeys
>
>     def process(self, elem):
>         import yaml
>         from frames import Frames
>
>         if not hasattr(self, 's3Client'):
>             import boto3
>             self.s3Client = boto3.client('s3',
>                                 aws_access_key_id=self.s3Creds[0],
>                                 aws_secret_access_key=self.s3Creds[1])
>
>         (key, info) = elem
>
>         preFrm = {}
>         resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
> Key=info['pre'][0][0])
>         yaml1 = yaml.load(resp1['Body'])
>
>         for elem in yaml1['body']:
>             preFrm[ elem['frame_tag']['frame_no'] ] = elem
>
>         postFrm = {}
>         resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
> Key=info['post'][0][0])
>         yaml2 = yaml.load(resp2['Body'])
>
>         for elem in yaml2['body']:
>             postFrm[ elem['frame_tag']['frame_no'] ] = elem
>
>         commonFrmNums =
> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>
>         for f in commonFrmNums:
>             frames = Frames(
>                           self.s3Bucket,
>                           info['pre'][0][0],            # Pre S3Key
>                           info['post'][0][0],           # Post S3Key
>                           yaml1['head']['operator_id'], # Pre OperatorId
>                           yaml2['head']['operator_id'], # Post OperatorId
>                           preFrm[f],                    # Pre Frame Line
>                           postFrm[f],                   # Post Frame Line
>                           info['pre'][0][1],            # Pre Last
> Modified Time
>                           info['post'][0][1])           # Post Last
> Modified Time
>
>             yield (frames)
>
>         tagCounts = TagCounts(
>                          self.s3Bucket,
>                          yaml1,               # Pre Yaml
>                          yaml2,               # Post Yaml
>                          info['pre'][0][0],   # Pre S3Key
>                          info['post'][0][0],  # Post S3Key
>                          info['pre'][0][1],   # Pre Last Modified Time
>                          info['post'][0][1] ) # Post Last Modified Time
>
>         yield beam.pvalue.TaggedOutput('counts', tagCounts)
> =========================================
>
> My pipeline options are below. I tried with and without "setup_file" but
> made no difference.
> =========================================
>         options = PipelineOptions([
>                       "--runner=PortableRunner",
>
> "--environment_config={0}".format(self.__docker_registry),
>                       "--environment_type=DOCKER",
>                       "--experiments=beam_fn_api",
>                       "--job_endpoint=localhost:8099"
>                   ])
>         options.view_as(SetupOptions).save_main_session = True
>         options.view_as(SetupOptions).setup_file =
> '/home/admin/quality-validation/bin/setup.py'
> =========================================
>
> Is it possible to solve dependency in ParDo linked to external module when
> using Apache Flink?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> [email protected]
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>

Reply via email to