Hello.

I would like to ask question for ParDo .

I am getting below error inside TaskManager when running code on Apache
Flink using Portable Runner.
=====================================================
....
  File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1078, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File
"/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
line 265, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in
loads
    return load(file, ignore)
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in
load
    obj = pik.load()
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, in
find_class
    return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
'apache_beam.runners.worker.sdk_worker_main' from
'/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
=====================================================

" FlattenTagFilesFn" is defined as ParDo and called from Pipeline as below.
=====================================================
    frames, counts = ({'pre': pcollPre, 'post': pcollPost}
                      | 'combined:cogroup' >> beam.CoGroupByKey()
                      | 'combined:exclude' >> beam.Filter(lambda x:
(len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
                      | 'combined:flat'    >>
beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
                                              .with_outputs('counts',
main='frames'))
=====================================================

In the same file I have defined the class as below.
=====================================================
class FlattenTagFilesFn(beam.DoFn):
    def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
        self.s3Bucket = s3Bucket
        self.s3Creds  = s3Creds
        self.maxKeys  = maxKeys
=====================================================

This is not a problem when running  pipeline using DirectRunner.
May I ask , how should I import class for ParDo when running on Flink ?

Thanks,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[email protected]
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Reply via email to