Hello beam gurus. I have modified the streaming wordcount example from 2.5 (
https://github.com/apache/beam/blob/v2.5.0/sdks/python/apache_beam/examples/streaming_wordcount.py)
to save the results to Google Cloud Storage instead of another pubsub topic
using this line:

output | beam.io.WriteToText(known_args.dest)


I configured this arg and am passing it in but when I run this and send
data to the input topic, I get this error:

INFO:root:2018-08-03T02:15:54.238Z: JOB_MESSAGE_DEBUG: Executing input step
topology_init_attach_disk_input_step
INFO:root:2018-08-03T02:16:11.180Z: JOB_MESSAGE_DETAILED: Workers have
started successfully.
INFO:root:2018-08-03T02:16:55.466Z: JOB_MESSAGE_ERROR:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
received from SDK harness for instruction -60: Traceback (most recent call
last):
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 127, in _execute
    response = task()
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 162, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 208, in do_instruction
    request.instruction_id)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 227, in process_bundle
    self.data_channel_factory)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 227, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 269, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 204, in wrapper
    result = cache[args] = func(*args)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 252, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 251, in <dictcomp>
    for tag, pcoll_id
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 204, in wrapper
    result = cache[args] = func(*args)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 255, in get_operation
    transform_id, transform_consumers)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 359, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 481, in create
    factory, transform_id, transform_proto, consumers, serialized_fn)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 529, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File
"/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 222, in loads
    c = base64.b64decode(encoded)
  File "/usr/lib/python2.7/base64.py", line 78, in b64decode
    raise TypeError(msg)
TypeError: Incorrect padding

There have been no changes made to the data, only the destination.
Is there an issue with streaming to GCS, or have I missed some
configuration?

Kindly,
-Brice

Reply via email to