Aha, that sounds like it might be the reason. Thank you for the reply!

Best,
-Brice

On Fri, Aug 3, 2018 at 6:30 PM, Udi Meiri <[email protected]> wrote:

> Hi Brice,
>
> I believe the issue you're seeing is due to that writing to GCS is not yet
> supported in streaming pipelines.
>
> Regards,
> Udi
>
> On Fri, Aug 3, 2018 at 6:28 AM Brice Giesbrecht <[email protected]> wrote:
>
>> Hello beam gurus. I have modified the streaming wordcount example from
>> 2.5 (https://github.com/apache/beam/blob/v2.5.0/sdks/python/
>> apache_beam/examples/streaming_wordcount.py) to save the results to
>> Google Cloud Storage instead of another pubsub topic using this line:
>>
>> output | beam.io.WriteToText(known_args.dest)
>>
>>
>> I configured this arg and am passing it in but when I run this and send
>> data to the input topic, I get this error:
>>
>> INFO:root:2018-08-03T02:15:54.238Z: JOB_MESSAGE_DEBUG: Executing input
>> step topology_init_attach_disk_input_step
>> INFO:root:2018-08-03T02:16:11.180Z: JOB_MESSAGE_DETAILED: Workers have
>> started successfully.
>> INFO:root:2018-08-03T02:16:55.466Z: JOB_MESSAGE_ERROR:
>> java.util.concurrent.ExecutionException: java.lang.RuntimeException:
>> Error received from SDK harness for instruction -60: Traceback (most recent
>> call last):
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 127, in _execute
>>     response = task()
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 162, in <lambda>
>>     self._execute(lambda: worker.do_instruction(work), work)
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 208, in do_instruction
>>     request.instruction_id)
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 227, in process_bundle
>>     self.data_channel_factory)
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 227, in __init__
>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 269, in create_execution_tree
>>     descriptor.transforms, key=topological_height, reverse=True)])
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 204, in wrapper
>>     result = cache[args] = func(*args)
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 252, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 251, in <dictcomp>
>>     for tag, pcoll_id
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 204, in wrapper
>>     result = cache[args] = func(*args)
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 255, in get_operation
>>     transform_id, transform_consumers)
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 359, in create_operation
>>     return creator(self, transform_id, transform_proto, payload,
>> consumers)
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 481, in create
>>     factory, transform_id, transform_proto, consumers, serialized_fn)
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/
>> runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
>>     dofn_data = pickler.loads(serialized_fn)
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
>> line 222, in loads
>>     c = base64.b64decode(encoded)
>>   File "/usr/lib/python2.7/base64.py", line 78, in b64decode
>>     raise TypeError(msg)
>> TypeError: Incorrect padding
>>
>> There have been no changes made to the data, only the destination.
>> Is there an issue with streaming to GCS, or have I missed some
>> configuration?
>>
>> Kindly,
>> -Brice
>>
>>
>>
>>

Reply via email to