Hi Chad,

This stub will only be replaced by the Dataflow service. It's an artifact of 
the pre-portability era.

That said, we now have the option to replace ReadFromPubSub with an external 
transform which would utilize Java's PubSubIO via the new cross-language 
feature.

Thanks,
Max

On 12.07.19 19:32, Chad Dombrova wrote:
> Hi all,
> This error came as a bit of a surprise.
>
> Here’s a snippet of the traceback (full traceback below).
>
> |File "apache_beam/runners/common.py", line 751, in
> apache_beam.runners.common.DoFnRunner.process return
> self.do_fn_invoker.invoke_process(windowed_value) File
> "apache_beam/runners/common.py", line 423, in
> apache_beam.runners.common.SimpleInvoker.invoke_process windowed_value,
> self.process_method(windowed_value.value)) File
> "/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
> line 860, in split_source AttributeError: '_PubSubSource' object has no
> attribute 'estimate_size' [while running 'PubSubInflow/Read/Split'] |
>
> Flink is using _PubSubSource which is, as far as I can tell, a stub that
> should be replaced at runtime by an actual streaming source. Is this
> error a bug or a known limitation? If the latter, is there a Jira issue
> and any momentum to solve this?
>
> I’m pretty confused by this because the Apache Beam Portability Support
> Matrix [1] makes it pretty clear that Flink supports streaming, and the
> docs for “Built-in I/O Transforms” lists Google PubSub and BigQuery as
> the only IO transforms that support streaming, so if streaming works
> with Flink, PubSub should probably be the thing it works with.
>
> I'm using beam 2.13.0 and flink 1.8.
>
> thanks,
> chad
>
> [1]
> https://docs.google.com/spreadsheets/d/1KDa_FGn1ShjomGd-UUDOhuh2q73de2tPz6BqHpzqvNI/edit#gid=0
> [2] https://beam.apache.org/documentation/io/built-in/
>
> Full traceback:
>
> |Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for
> instruction 5: Traceback (most recent call last): File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 157, in _execute response = task() File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 190, in <lambda> self._execute(lambda: worker.do_instruction(work),
> work) File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 333, in do_instruction request.instruction_id) File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 359, in process_bundle
> bundle_processor.process_bundle(instruction_id)) File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 589, in process_bundle ].process_encoded(data.data) File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 143, in process_encoded self.output(decoded_value) File
> "apache_beam/runners/worker/operations.py", line 246, in
> apache_beam.runners.worker.operations.Operation.output def output(self,
> windowed_value, output_index=0): File
> "apache_beam/runners/worker/operations.py", line 247, in
> apache_beam.runners.worker.operations.Operation.output
> cython.cast(Receiver,
> self.receivers[output_index]).receive(windowed_value) File
> "apache_beam/runners/worker/operations.py", line 143, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> self.consumer.process(windowed_value) File
> "apache_beam/runners/worker/operations.py", line 583, in
> apache_beam.runners.worker.operations.DoOperation.process with
> self.scoped_process_state: File
> "apache_beam/runners/worker/operations.py", line 584, in
> apache_beam.runners.worker.operations.DoOperation.process
> delayed_application = self.dofn_receiver.receive(o) File
> "apache_beam/runners/common.py", line 747, in
> apache_beam.runners.common.DoFnRunner.receive
> self.process(windowed_value) File "apache_beam/runners/common.py", line
> 753, in apache_beam.runners.common.DoFnRunner.process
> self._reraise_augmented(exn) File "apache_beam/runners/common.py", line
> 807, in apache_beam.runners.common.DoFnRunner._reraise_augmented
> raise_with_traceback(new_exn) File "apache_beam/runners/common.py", line
> 751, in apache_beam.runners.common.DoFnRunner.process return
> self.do_fn_invoker.invoke_process(windowed_value) File
> "apache_beam/runners/common.py", line 423, in
> apache_beam.runners.common.SimpleInvoker.invoke_process windowed_value,
> self.process_method(windowed_value.value)) File
> "/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
> line 860, in split_source AttributeError: '_PubSubSource' object has no
> attribute 'estimate_size' [while running 'PubSubInflow/Read/Split'] |
>

Reply via email to