Actually there was a good example in the latest wordcount.py in master repo.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py On Thu, Sep 26, 2019 at 12:00 PM Yu Watanabe <[email protected]> wrote: > Thank you for the help. > > I have chosen to remove the super().__init__() . > > Thanks, > Yu > > On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka <[email protected]> wrote: > >> super has some issues wile pickling in python3. Please refer >> https://github.com/uqfoundation/dill/issues/300 for more details. >> >> Removing reference to super in your dofn should help. >> >> On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe <[email protected]> >> wrote: >> >>> Thank you for the reply. >>> >>> " save_main_session" did not work, however, situation had changed. >>> >>> 1. get_all_options() output. "save_main_session" set to True. >>> >>> ================================================================================= >>> 2019-09-26 09:04:11,586 DEBUG Pipeline Options: >>> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform': >>> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com', >>> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest', >>> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location': >>> 'default', 'profile_memory': False, 'max_num_workers': None, >>> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False, >>> 'setup_file': None, 'network': None, 'on_success_matcher': None, >>> 'requirements_cache': None, 'service_account_email': None, >>> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None, >>> 'profile_location': None, 'direct_runner_use_stacked_bundle': True, >>> 'use_public_ips': None, ***** 'save_main_session': True, ******* >>> 'direct_num_workers': 1, 'num_workers': None, >>> 'worker_harness_container_image': None, 'template_location': None, >>> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False, >>> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner': >>> 'PortableRunner', 'project': None, 'dataflow_kms_key': None, >>> 'job_endpoint': 'localhost:8099', 'extra_packages': None, >>> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm': >>> None, 'staging_location': None, 'job_name': None, 'no_auth': False, >>> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0, >>> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None, >>> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism': >>> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None, >>> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file': >>> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None} >>> >>> >>> ================================================================================= >>> >>> 2. Error in Task Manager log did not change. >>> >>> ================================================================================== >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >>> in find_class >>> return StockUnpickler.find_class(self, module, name) >>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module >>> 'apache_beam.runners.worker.sdk_worker_main' from >>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> >>> >>> ================================================================================== >>> >>> 3. However, if I comment out "super().__init__()" in my code , error >>> changes. >>> >>> ================================================================================== >>> File >>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 1078, in _create_pardo_operation >>> dofn_data = pickler.loads(serialized_fn) >>> File >>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>> line 265, in loads >>> return dill.loads(s) >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, >>> in loads >>> return load(file, ignore) >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, >>> in load >>> obj = pik.load() >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >>> in find_class >>> return StockUnpickler.find_class(self, module, name) >>> ImportError: No module named 's3_credentials' >>> ================================================================================== >>> >>> >>> 4. My whole class is below. >>> >>> ================================================================================== >>> class FlattenTagFilesFn(beam.DoFn): >>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >>> self.s3Bucket = s3Bucket >>> self.s3Creds = s3Creds >>> self.maxKeys = maxKeys >>> >>> super().__init__() >>> >>> def process(self, elem): >>> >>> if not hasattr(self, 's3Client'): >>> import boto3 >>> self.s3Client = boto3.client('s3', >>> >>> aws_access_key_id=self.s3Creds.awsAccessKeyId, >>> >>> aws_secret_access_key=self.s3Creds.awsSecretAccessKey) >>> >>> (key, info) = elem >>> >>> preFrm = {} >>> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket, >>> Key=info['pre'][0][0]) >>> yaml1 = yaml.load(resp1['Body']) >>> >>> for elem in yaml1['body']: >>> preFrm[ elem['frame_tag']['frame_no'] ] = elem >>> >>> postFrm = {} >>> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket, >>> Key=info['post'][0][0]) >>> yaml2 = yaml.load(resp2['Body']) >>> >>> for elem in yaml2['body']: >>> postFrm[ elem['frame_tag']['frame_no'] ] = elem >>> >>> commonFrmNums = >>> set(list(preFrm.keys())).intersection(list(postFrm.keys())) >>> >>> for f in commonFrmNums: >>> frames = Frames( >>> self.s3Bucket, >>> info['pre'][0][0], # Pre S3Key >>> info['post'][0][0], # Post S3Key >>> yaml1['head']['operator_id'], # Pre OperatorId >>> yaml2['head']['operator_id'], # Post OperatorId >>> preFrm[f], # Pre Frame Line >>> postFrm[f], # Post Frame Line >>> info['pre'][0][1], # Pre Last >>> Modified Time >>> info['post'][0][1]) # Post Last >>> Modified Time >>> >>> yield (frames) >>> >>> tagCounts = TagCounts( >>> self.s3Bucket, >>> yaml1, # Pre Yaml >>> yaml2, # Post Yaml >>> info['pre'][0][0], # Pre S3Key >>> info['post'][0][0], # Post S3Key >>> info['pre'][0][1], # Pre Last Modified Time >>> info['post'][0][1] ) # Post Last Modified Time >>> >>> yield beam.pvalue.TaggedOutput('counts', tagCounts) >>> >>> ================================================================================== >>> >>> I was using super() to define single instance of boto instance in ParDo. >>> May I ask, is there a way to call super() in the constructor of ParDo ? >>> >>> Thanks, >>> Yu >>> >>> >>> On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <[email protected]> wrote: >>> >>>> You will need to set the save_main_session pipeline option to True. >>>> >>>> Kyle Weaver | Software Engineer | github.com/ibzib | >>>> [email protected] >>>> >>>> >>>> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <[email protected]> >>>> wrote: >>>> >>>>> Hello. >>>>> >>>>> I would like to ask question for ParDo . >>>>> >>>>> I am getting below error inside TaskManager when running code on >>>>> Apache Flink using Portable Runner. >>>>> ===================================================== >>>>> .... >>>>> File >>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>> line 1078, in _create_pardo_operation >>>>> dofn_data = pickler.loads(serialized_fn) >>>>> File >>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>>>> line 265, in loads >>>>> return dill.loads(s) >>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line >>>>> 317, in loads >>>>> return load(file, ignore) >>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line >>>>> 305, in load >>>>> obj = pik.load() >>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line >>>>> 474, in find_class >>>>> return StockUnpickler.find_class(self, module, name) >>>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module >>>>> 'apache_beam.runners.worker.sdk_worker_main' from >>>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> >>>>> ===================================================== >>>>> >>>>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as >>>>> below. >>>>> ===================================================== >>>>> frames, counts = ({'pre': pcollPre, 'post': pcollPost} >>>>> | 'combined:cogroup' >> beam.CoGroupByKey() >>>>> | 'combined:exclude' >> beam.Filter(lambda x: >>>>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0)) >>>>> | 'combined:flat' >> >>>>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds)) >>>>> .with_outputs('counts', >>>>> main='frames')) >>>>> ===================================================== >>>>> >>>>> In the same file I have defined the class as below. >>>>> ===================================================== >>>>> class FlattenTagFilesFn(beam.DoFn): >>>>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >>>>> self.s3Bucket = s3Bucket >>>>> self.s3Creds = s3Creds >>>>> self.maxKeys = maxKeys >>>>> ===================================================== >>>>> >>>>> This is not a problem when running pipeline using DirectRunner. >>>>> May I ask , how should I import class for ParDo when running on Flink ? >>>>> >>>>> Thanks, >>>>> Yu Watanabe >>>>> >>>>> -- >>>>> Yu Watanabe >>>>> Weekend Freelancer who loves to challenge building data platform >>>>> [email protected] >>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >>>>> Twitter icon] <https://twitter.com/yuwtennis> >>>>> >>>> >>> >>> -- >>> Yu Watanabe >>> Weekend Freelancer who loves to challenge building data platform >>> [email protected] >>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >>> Twitter icon] <https://twitter.com/yuwtennis> >>> >> > > -- > Yu Watanabe > Weekend Freelancer who loves to challenge building data platform > [email protected] > [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: > Twitter icon] <https://twitter.com/yuwtennis> > -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform [email protected] [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
