Aha, that sounds like it might be the reason. Thank you for the reply! Best, -Brice
On Fri, Aug 3, 2018 at 6:30 PM, Udi Meiri <[email protected]> wrote: > Hi Brice, > > I believe the issue you're seeing is due to that writing to GCS is not yet > supported in streaming pipelines. > > Regards, > Udi > > On Fri, Aug 3, 2018 at 6:28 AM Brice Giesbrecht <[email protected]> wrote: > >> Hello beam gurus. I have modified the streaming wordcount example from >> 2.5 (https://github.com/apache/beam/blob/v2.5.0/sdks/python/ >> apache_beam/examples/streaming_wordcount.py) to save the results to >> Google Cloud Storage instead of another pubsub topic using this line: >> >> output | beam.io.WriteToText(known_args.dest) >> >> >> I configured this arg and am passing it in but when I run this and send >> data to the input topic, I get this error: >> >> INFO:root:2018-08-03T02:15:54.238Z: JOB_MESSAGE_DEBUG: Executing input >> step topology_init_attach_disk_input_step >> INFO:root:2018-08-03T02:16:11.180Z: JOB_MESSAGE_DETAILED: Workers have >> started successfully. >> INFO:root:2018-08-03T02:16:55.466Z: JOB_MESSAGE_ERROR: >> java.util.concurrent.ExecutionException: java.lang.RuntimeException: >> Error received from SDK harness for instruction -60: Traceback (most recent >> call last): >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", >> line 127, in _execute >> response = task() >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", >> line 162, in <lambda> >> self._execute(lambda: worker.do_instruction(work), work) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", >> line 208, in do_instruction >> request.instruction_id) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", >> line 227, in process_bundle >> self.data_channel_factory) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 227, in __init__ >> self.ops = self.create_execution_tree(self.process_bundle_descriptor) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 269, in create_execution_tree >> descriptor.transforms, key=topological_height, reverse=True)]) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 204, in wrapper >> result = cache[args] = func(*args) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 252, in get_operation >> in descriptor.transforms[transform_id].outputs.items() >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 251, in <dictcomp> >> for tag, pcoll_id >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 204, in wrapper >> result = cache[args] = func(*args) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 255, in get_operation >> transform_id, transform_consumers) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 359, in create_operation >> return creator(self, transform_id, transform_proto, payload, >> consumers) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 481, in create >> factory, transform_id, transform_proto, consumers, serialized_fn) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/ >> runners/worker/bundle_processor.py", line 529, in _create_pardo_operation >> dofn_data = pickler.loads(serialized_fn) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", >> line 222, in loads >> c = base64.b64decode(encoded) >> File "/usr/lib/python2.7/base64.py", line 78, in b64decode >> raise TypeError(msg) >> TypeError: Incorrect padding >> >> There have been no changes made to the data, only the destination. >> Is there an issue with streaming to GCS, or have I missed some >> configuration? >> >> Kindly, >> -Brice >> >> >> >>
