Actually there was a good example in the latest wordcount.py in master repo.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py


On Thu, Sep 26, 2019 at 12:00 PM Yu Watanabe <[email protected]> wrote:

> Thank you for the help.
>
> I have chosen to remove the super().__init__() .
>
> Thanks,
> Yu
>
> On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka <[email protected]> wrote:
>
>> super has some issues wile pickling in python3. Please refer
>> https://github.com/uqfoundation/dill/issues/300 for more details.
>>
>> Removing reference to super in your dofn should help.
>>
>> On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe <[email protected]>
>> wrote:
>>
>>> Thank you for the reply.
>>>
>>> " save_main_session" did not work, however, situation had changed.
>>>
>>> 1. get_all_options() output. "save_main_session" set to True.
>>>
>>> =================================================================================
>>> 2019-09-26 09:04:11,586 DEBUG Pipeline Options:
>>> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform':
>>> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com',
>>> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest',
>>> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location':
>>> 'default', 'profile_memory': False, 'max_num_workers': None,
>>> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False,
>>> 'setup_file': None, 'network': None, 'on_success_matcher': None,
>>> 'requirements_cache': None, 'service_account_email': None,
>>> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None,
>>> 'profile_location': None, 'direct_runner_use_stacked_bundle': True,
>>> 'use_public_ips': None, ***** 'save_main_session': True, *******
>>> 'direct_num_workers': 1, 'num_workers': None,
>>> 'worker_harness_container_image': None, 'template_location': None,
>>> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False,
>>> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner':
>>> 'PortableRunner', 'project': None, 'dataflow_kms_key': None,
>>> 'job_endpoint': 'localhost:8099', 'extra_packages': None,
>>> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm':
>>> None, 'staging_location': None, 'job_name': None, 'no_auth': False,
>>> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0,
>>> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None,
>>> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism':
>>> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None,
>>> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file':
>>> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None}
>>>
>>>  
>>> =================================================================================
>>>
>>> 2. Error in Task Manager log did not change.
>>>
>>> ==================================================================================
>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>>> in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>>> 'apache_beam.runners.worker.sdk_worker_main' from
>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>>
>>> ==================================================================================
>>>
>>> 3. However, if I comment out "super().__init__()" in my code , error
>>> changes.
>>>
>>> ==================================================================================
>>>   File
>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1078, in _create_pardo_operation
>>>     dofn_data = pickler.loads(serialized_fn)
>>>   File
>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>> line 265, in loads
>>>     return dill.loads(s)
>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
>>> in loads
>>>     return load(file, ignore)
>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
>>> in load
>>>     obj = pik.load()
>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>>> in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> ImportError: No module named 's3_credentials'
>>> ==================================================================================
>>>
>>>
>>> 4. My whole class is below.
>>>
>>> ==================================================================================
>>> class FlattenTagFilesFn(beam.DoFn):
>>>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>>         self.s3Bucket = s3Bucket
>>>         self.s3Creds  = s3Creds
>>>         self.maxKeys  = maxKeys
>>>
>>>         super().__init__()
>>>
>>>     def process(self, elem):
>>>
>>>         if not hasattr(self, 's3Client'):
>>>             import boto3
>>>             self.s3Client = boto3.client('s3',
>>>
>>> aws_access_key_id=self.s3Creds.awsAccessKeyId,
>>>
>>> aws_secret_access_key=self.s3Creds.awsSecretAccessKey)
>>>
>>>         (key, info) = elem
>>>
>>>         preFrm = {}
>>>         resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['pre'][0][0])
>>>         yaml1 = yaml.load(resp1['Body'])
>>>
>>>         for elem in yaml1['body']:
>>>             preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>
>>>         postFrm = {}
>>>         resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['post'][0][0])
>>>         yaml2 = yaml.load(resp2['Body'])
>>>
>>>         for elem in yaml2['body']:
>>>             postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>
>>>         commonFrmNums =
>>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>>
>>>         for f in commonFrmNums:
>>>             frames = Frames(
>>>                           self.s3Bucket,
>>>                           info['pre'][0][0],            # Pre S3Key
>>>                           info['post'][0][0],           # Post S3Key
>>>                           yaml1['head']['operator_id'], # Pre OperatorId
>>>                           yaml2['head']['operator_id'], # Post OperatorId
>>>                           preFrm[f],                    # Pre Frame Line
>>>                           postFrm[f],                   # Post Frame Line
>>>                           info['pre'][0][1],            # Pre Last
>>> Modified Time
>>>                           info['post'][0][1])           # Post Last
>>> Modified Time
>>>
>>>             yield (frames)
>>>
>>>         tagCounts = TagCounts(
>>>                          self.s3Bucket,
>>>                          yaml1,               # Pre Yaml
>>>                          yaml2,               # Post Yaml
>>>                          info['pre'][0][0],   # Pre S3Key
>>>                          info['post'][0][0],  # Post S3Key
>>>                          info['pre'][0][1],   # Pre Last Modified Time
>>>                          info['post'][0][1] ) # Post Last Modified Time
>>>
>>>         yield beam.pvalue.TaggedOutput('counts', tagCounts)
>>>
>>> ==================================================================================
>>>
>>> I was using super() to define single instance of boto instance in ParDo.
>>> May I ask, is there a way to call super() in the constructor of ParDo ?
>>>
>>> Thanks,
>>> Yu
>>>
>>>
>>> On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <[email protected]> wrote:
>>>
>>>> You will need to set the save_main_session pipeline option to True.
>>>>
>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>> [email protected]
>>>>
>>>>
>>>> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <[email protected]>
>>>> wrote:
>>>>
>>>>> Hello.
>>>>>
>>>>> I would like to ask question for ParDo .
>>>>>
>>>>> I am getting below error inside TaskManager when running code on
>>>>> Apache Flink using Portable Runner.
>>>>> =====================================================
>>>>> ....
>>>>>   File
>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 1078, in _create_pardo_operation
>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>   File
>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>> line 265, in loads
>>>>>     return dill.loads(s)
>>>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>>> 317, in loads
>>>>>     return load(file, ignore)
>>>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>>> 305, in load
>>>>>     obj = pik.load()
>>>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>>> 474, in find_class
>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>>>>> 'apache_beam.runners.worker.sdk_worker_main' from
>>>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>>>> =====================================================
>>>>>
>>>>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as
>>>>> below.
>>>>> =====================================================
>>>>>     frames, counts = ({'pre': pcollPre, 'post': pcollPost}
>>>>>                       | 'combined:cogroup' >> beam.CoGroupByKey()
>>>>>                       | 'combined:exclude' >> beam.Filter(lambda x:
>>>>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
>>>>>                       | 'combined:flat'    >>
>>>>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
>>>>>                                               .with_outputs('counts',
>>>>> main='frames'))
>>>>> =====================================================
>>>>>
>>>>> In the same file I have defined the class as below.
>>>>> =====================================================
>>>>> class FlattenTagFilesFn(beam.DoFn):
>>>>>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>>>>         self.s3Bucket = s3Bucket
>>>>>         self.s3Creds  = s3Creds
>>>>>         self.maxKeys  = maxKeys
>>>>> =====================================================
>>>>>
>>>>> This is not a problem when running  pipeline using DirectRunner.
>>>>> May I ask , how should I import class for ParDo when running on Flink ?
>>>>>
>>>>> Thanks,
>>>>> Yu Watanabe
>>>>>
>>>>> --
>>>>> Yu Watanabe
>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>> [email protected]
>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>
>>>>
>>>
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> [email protected]
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>
>>
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> [email protected]
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>


-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[email protected]
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Reply via email to