Did you try moving the imports from the process function to the top of main.py?
Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <[email protected]> wrote: > Hello. > > I would like to ask for help with resolving dependency issue for imported > module. > > I have a directory structure as below and I am trying to import Frames > class from frames.py to main.py. > ========================================= > quality-validation/ > bin/setup.py > main.py > modules/ > frames.py > <TRIMMED> > ========================================= > > However, when I run pipeline, I get below error at TaskManager. > ========================================= > <TRIMMED> > File "apache_beam/runners/common.py", line 942, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/worker/operations.py", line 143, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 593, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/worker/operations.py", line 594, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/common.py", line 799, in > apache_beam.runners.common.DoFnRunner.receive > File "apache_beam/runners/common.py", line 805, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 872, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py", > line 419, in raise_with_traceback > raise exc.with_traceback(traceback) > File "apache_beam/runners/common.py", line 803, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 465, in > apache_beam.runners.common.SimpleInvoker.invoke_process > File "apache_beam/runners/common.py", line 918, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "/home/admin/quality-validation/bin/main.py", line 44, in process > ImportError: No module named 'frames' [while running > 'combined:flat/ParDo(FlattenTagFilesFn)'] > ========================================= > > I import modules at global context and also at top of the process > function . > ========================================= > [main.py] > # > # Standard library imports > # > import logging > import pprint > import sys > sys.path.append("{0}/modules".format(sys.path[0])) > sys.path.append("{0}/modules/vendor".format(sys.path[0])) > > # > # Related third party imports > # > import apache_beam as beam > > # > # Local application/library specific imports > # > import utils > from pipeline_wrapper import pipelineWrapper > from tag_counts import TagCounts > from tags import Tags > > <TRIMMED> > > class FlattenTagFilesFn(beam.DoFn): > def __init__(self, s3Bucket, s3Creds, maxKeys=1000): > beam.DoFn.__init__(self) > > self.s3Bucket = s3Bucket > self.s3Creds = s3Creds > self.maxKeys = maxKeys > > def process(self, elem): > import yaml > from frames import Frames > > if not hasattr(self, 's3Client'): > import boto3 > self.s3Client = boto3.client('s3', > aws_access_key_id=self.s3Creds[0], > aws_secret_access_key=self.s3Creds[1]) > > (key, info) = elem > > preFrm = {} > resp1 = self.s3Client.get_object(Bucket=self.s3Bucket, > Key=info['pre'][0][0]) > yaml1 = yaml.load(resp1['Body']) > > for elem in yaml1['body']: > preFrm[ elem['frame_tag']['frame_no'] ] = elem > > postFrm = {} > resp2 = self.s3Client.get_object(Bucket=self.s3Bucket, > Key=info['post'][0][0]) > yaml2 = yaml.load(resp2['Body']) > > for elem in yaml2['body']: > postFrm[ elem['frame_tag']['frame_no'] ] = elem > > commonFrmNums = > set(list(preFrm.keys())).intersection(list(postFrm.keys())) > > for f in commonFrmNums: > frames = Frames( > self.s3Bucket, > info['pre'][0][0], # Pre S3Key > info['post'][0][0], # Post S3Key > yaml1['head']['operator_id'], # Pre OperatorId > yaml2['head']['operator_id'], # Post OperatorId > preFrm[f], # Pre Frame Line > postFrm[f], # Post Frame Line > info['pre'][0][1], # Pre Last > Modified Time > info['post'][0][1]) # Post Last > Modified Time > > yield (frames) > > tagCounts = TagCounts( > self.s3Bucket, > yaml1, # Pre Yaml > yaml2, # Post Yaml > info['pre'][0][0], # Pre S3Key > info['post'][0][0], # Post S3Key > info['pre'][0][1], # Pre Last Modified Time > info['post'][0][1] ) # Post Last Modified Time > > yield beam.pvalue.TaggedOutput('counts', tagCounts) > ========================================= > > My pipeline options are below. I tried with and without "setup_file" but > made no difference. > ========================================= > options = PipelineOptions([ > "--runner=PortableRunner", > > "--environment_config={0}".format(self.__docker_registry), > "--environment_type=DOCKER", > "--experiments=beam_fn_api", > "--job_endpoint=localhost:8099" > ]) > options.view_as(SetupOptions).save_main_session = True > options.view_as(SetupOptions).setup_file = > '/home/admin/quality-validation/bin/setup.py' > ========================================= > > Is it possible to solve dependency in ParDo linked to external module when > using Apache Flink? > > Thanks, > Yu Watanabe > > -- > Yu Watanabe > Weekend Freelancer who loves to challenge building data platform > [email protected] > [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: > Twitter icon] <https://twitter.com/yuwtennis> >
