super has some issues wile pickling in python3. Please refer https://github.com/uqfoundation/dill/issues/300 for more details.
Removing reference to super in your dofn should help. On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe <[email protected]> wrote: > Thank you for the reply. > > " save_main_session" did not work, however, situation had changed. > > 1. get_all_options() output. "save_main_session" set to True. > > ================================================================================= > 2019-09-26 09:04:11,586 DEBUG Pipeline Options: > {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform': > None, 'dataflow_endpoint': 'https://dataflow.googleapis.com', > 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest', > 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location': > 'default', 'profile_memory': False, 'max_num_workers': None, > 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False, > 'setup_file': None, 'network': None, 'on_success_matcher': None, > 'requirements_cache': None, 'service_account_email': None, > 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None, > 'profile_location': None, 'direct_runner_use_stacked_bundle': True, > 'use_public_ips': None, ***** 'save_main_session': True, ******* > 'direct_num_workers': 1, 'num_workers': None, > 'worker_harness_container_image': None, 'template_location': None, > 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False, > 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner': > 'PortableRunner', 'project': None, 'dataflow_kms_key': None, > 'job_endpoint': 'localhost:8099', 'extra_packages': None, > 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm': > None, 'staging_location': None, 'job_name': None, 'no_auth': False, > 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0, > 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None, > 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism': > 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None, > 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file': > None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None} > > > ================================================================================= > > 2. Error in Task Manager log did not change. > > ================================================================================== > File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, > in find_class > return StockUnpickler.find_class(self, module, name) > AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module > 'apache_beam.runners.worker.sdk_worker_main' from > '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> > > ================================================================================== > > 3. However, if I comment out "super().__init__()" in my code , error > changes. > > ================================================================================== > File > "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 1078, in _create_pardo_operation > dofn_data = pickler.loads(serialized_fn) > File > "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", > line 265, in loads > return dill.loads(s) > File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, > in loads > return load(file, ignore) > File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, > in load > obj = pik.load() > File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, > in find_class > return StockUnpickler.find_class(self, module, name) > ImportError: No module named 's3_credentials' > ================================================================================== > > > 4. My whole class is below. > > ================================================================================== > class FlattenTagFilesFn(beam.DoFn): > def __init__(self, s3Bucket, s3Creds, maxKeys=1000): > self.s3Bucket = s3Bucket > self.s3Creds = s3Creds > self.maxKeys = maxKeys > > super().__init__() > > def process(self, elem): > > if not hasattr(self, 's3Client'): > import boto3 > self.s3Client = boto3.client('s3', > > aws_access_key_id=self.s3Creds.awsAccessKeyId, > > aws_secret_access_key=self.s3Creds.awsSecretAccessKey) > > (key, info) = elem > > preFrm = {} > resp1 = self.s3Client.get_object(Bucket=self.s3Bucket, > Key=info['pre'][0][0]) > yaml1 = yaml.load(resp1['Body']) > > for elem in yaml1['body']: > preFrm[ elem['frame_tag']['frame_no'] ] = elem > > postFrm = {} > resp2 = self.s3Client.get_object(Bucket=self.s3Bucket, > Key=info['post'][0][0]) > yaml2 = yaml.load(resp2['Body']) > > for elem in yaml2['body']: > postFrm[ elem['frame_tag']['frame_no'] ] = elem > > commonFrmNums = > set(list(preFrm.keys())).intersection(list(postFrm.keys())) > > for f in commonFrmNums: > frames = Frames( > self.s3Bucket, > info['pre'][0][0], # Pre S3Key > info['post'][0][0], # Post S3Key > yaml1['head']['operator_id'], # Pre OperatorId > yaml2['head']['operator_id'], # Post OperatorId > preFrm[f], # Pre Frame Line > postFrm[f], # Post Frame Line > info['pre'][0][1], # Pre Last > Modified Time > info['post'][0][1]) # Post Last > Modified Time > > yield (frames) > > tagCounts = TagCounts( > self.s3Bucket, > yaml1, # Pre Yaml > yaml2, # Post Yaml > info['pre'][0][0], # Pre S3Key > info['post'][0][0], # Post S3Key > info['pre'][0][1], # Pre Last Modified Time > info['post'][0][1] ) # Post Last Modified Time > > yield beam.pvalue.TaggedOutput('counts', tagCounts) > > ================================================================================== > > I was using super() to define single instance of boto instance in ParDo. > May I ask, is there a way to call super() in the constructor of ParDo ? > > Thanks, > Yu > > > On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <[email protected]> wrote: > >> You will need to set the save_main_session pipeline option to True. >> >> Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] >> >> >> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <[email protected]> >> wrote: >> >>> Hello. >>> >>> I would like to ask question for ParDo . >>> >>> I am getting below error inside TaskManager when running code on Apache >>> Flink using Portable Runner. >>> ===================================================== >>> .... >>> File >>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 1078, in _create_pardo_operation >>> dofn_data = pickler.loads(serialized_fn) >>> File >>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>> line 265, in loads >>> return dill.loads(s) >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, >>> in loads >>> return load(file, ignore) >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, >>> in load >>> obj = pik.load() >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >>> in find_class >>> return StockUnpickler.find_class(self, module, name) >>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module >>> 'apache_beam.runners.worker.sdk_worker_main' from >>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> >>> ===================================================== >>> >>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as >>> below. >>> ===================================================== >>> frames, counts = ({'pre': pcollPre, 'post': pcollPost} >>> | 'combined:cogroup' >> beam.CoGroupByKey() >>> | 'combined:exclude' >> beam.Filter(lambda x: >>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0)) >>> | 'combined:flat' >> >>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds)) >>> .with_outputs('counts', >>> main='frames')) >>> ===================================================== >>> >>> In the same file I have defined the class as below. >>> ===================================================== >>> class FlattenTagFilesFn(beam.DoFn): >>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >>> self.s3Bucket = s3Bucket >>> self.s3Creds = s3Creds >>> self.maxKeys = maxKeys >>> ===================================================== >>> >>> This is not a problem when running pipeline using DirectRunner. >>> May I ask , how should I import class for ParDo when running on Flink ? >>> >>> Thanks, >>> Yu Watanabe >>> >>> -- >>> Yu Watanabe >>> Weekend Freelancer who loves to challenge building data platform >>> [email protected] >>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >>> Twitter icon] <https://twitter.com/yuwtennis> >>> >> > > -- > Yu Watanabe > Weekend Freelancer who loves to challenge building data platform > [email protected] > [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: > Twitter icon] <https://twitter.com/yuwtennis> >
