Hello.
I would like to ask question for ParDo .
I am getting below error inside TaskManager when running code on Apache
Flink using Portable Runner.
=====================================================
....
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1078, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
line 265, in loads
return dill.loads(s)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in
loads
return load(file, ignore)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in
load
obj = pik.load()
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, in
find_class
return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
'apache_beam.runners.worker.sdk_worker_main' from
'/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
=====================================================
" FlattenTagFilesFn" is defined as ParDo and called from Pipeline as below.
=====================================================
frames, counts = ({'pre': pcollPre, 'post': pcollPost}
| 'combined:cogroup' >> beam.CoGroupByKey()
| 'combined:exclude' >> beam.Filter(lambda x:
(len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
| 'combined:flat' >>
beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
.with_outputs('counts',
main='frames'))
=====================================================
In the same file I have defined the class as below.
=====================================================
class FlattenTagFilesFn(beam.DoFn):
def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
self.s3Bucket = s3Bucket
self.s3Creds = s3Creds
self.maxKeys = maxKeys
=====================================================
This is not a problem when running pipeline using DirectRunner.
May I ask , how should I import class for ParDo when running on Flink ?
Thanks,
Yu Watanabe
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[email protected]
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
Twitter icon] <https://twitter.com/yuwtennis>