Dear Community, I'm trying to use the following script to write into BigTable using Beam/DataFlow, but get the below error. Did anyone manage to run this script ?
Thanks for your support ! *Python*: 3.7 *Code* 1) Create instance gcloud beta bigtable instances create test-instance --cluster test-cluster --display-name test-instance --cluster-zone us-central1-a --cluster-num-nodes 3 2) Main logic https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigtableio.py 3) Homemade pipeline pipeline_options = PipelineOptions( save_main_session=True, streaming=True, runner='DataflowRunner', project=PROJECT, region=REGION, temp_location=TEMP_LOCATION, staging_location=STAGING_LOCATION ) def run (): with beam.Pipeline(options=pipeline_options) as p: input_subscription=f"projects/{PROJECT}/subscriptions/{SUBSCRIPTION}" _ = (p | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes) | 'Conversion UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8')) | WriteToBigTable(PROJECT,INSTANCE,TABLE)) *Error* run() Traceback (most recent call last): File "<ipython-input-49-ec9775ede022>", line 1, in <module> run() File "/<mycode>/bigtableio.py", line 300, in run TABLE)) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__ self.result = self.run() File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py", line 532, in run self._options).run(False) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py", line 558, in run pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle')) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 313, in dump_session return dill.dump_session(file_path) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py", line 351, in dump_session pickler.dump(main) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py", line 445, in dump StockPickler.dump(self, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 437, in dump self.save(obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 173, in save_module return old_save_module(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py", line 1295, in save_module state=_main_dict) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 662, in save_reduce save(state) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 221, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, *rv) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 662, in save_reduce save(state) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 221, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, *rv) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 662, in save_reduce save(state) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 221, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, *rv) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 662, in save_reduce save(state) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 221, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, *rv) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 662, in save_reduce save(state) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 221, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py", line 524, in save rv = reduce(self.proto) File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__ TypeError: no default __reduce__ due to non-trivial __cinit__ -- Best regards, Pierre
