Hi Brice, I believe the issue you're seeing is due to that writing to GCS is not yet supported in streaming pipelines.
Regards, Udi On Fri, Aug 3, 2018 at 6:28 AM Brice Giesbrecht <[email protected]> wrote: > Hello beam gurus. I have modified the streaming wordcount example from 2.5 > ( > https://github.com/apache/beam/blob/v2.5.0/sdks/python/apache_beam/examples/streaming_wordcount.py) > to save the results to Google Cloud Storage instead of another pubsub topic > using this line: > > output | beam.io.WriteToText(known_args.dest) > > > I configured this arg and am passing it in but when I run this and send > data to the input topic, I get this error: > > INFO:root:2018-08-03T02:15:54.238Z: JOB_MESSAGE_DEBUG: Executing input > step topology_init_attach_disk_input_step > INFO:root:2018-08-03T02:16:11.180Z: JOB_MESSAGE_DETAILED: Workers have > started successfully. > INFO:root:2018-08-03T02:16:55.466Z: JOB_MESSAGE_ERROR: > java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error > received from SDK harness for instruction -60: Traceback (most recent call > last): > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", > line 127, in _execute > response = task() > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", > line 162, in <lambda> > self._execute(lambda: worker.do_instruction(work), work) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", > line 208, in do_instruction > request.instruction_id) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", > line 227, in process_bundle > self.data_channel_factory) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 227, in __init__ > self.ops = self.create_execution_tree(self.process_bundle_descriptor) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 269, in create_execution_tree > descriptor.transforms, key=topological_height, reverse=True)]) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 204, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 252, in get_operation > in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 251, in <dictcomp> > for tag, pcoll_id > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 204, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 255, in get_operation > transform_id, transform_consumers) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 359, in create_operation > return creator(self, transform_id, transform_proto, payload, consumers) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 481, in create > factory, transform_id, transform_proto, consumers, serialized_fn) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 529, in _create_pardo_operation > dofn_data = pickler.loads(serialized_fn) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", > line 222, in loads > c = base64.b64decode(encoded) > File "/usr/lib/python2.7/base64.py", line 78, in b64decode > raise TypeError(msg) > TypeError: Incorrect padding > > There have been no changes made to the data, only the destination. > Is there an issue with streaming to GCS, or have I missed some > configuration? > > Kindly, > -Brice > > > >
smime.p7s
Description: S/MIME Cryptographic Signature
