Hmm, seems like a Java (external) ParDo is being forwarded to Python SDK
for execution somehow. +Maximilian Michels <[email protected]> might know more.

On Sun, Jan 5, 2020 at 2:57 AM Yu Watanabe <[email protected]> wrote:

> Hello.
>
> I would like to use sinking data into kafka using kafka module for
> python , however,
> getting below assertion  error when pipeline is executed.
>
> --------------------------------------------------------------------------
> beam_1         | Traceback (most recent call last):
> beam_1         |   File "src/sample.py", line 50, in <module>
> beam_1         |     main()
> beam_1         |   File "src/sample.py", line 46, in main
> beam_1         |     'localhost:8097' ))
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 427, in __exit__
> beam_1         |     self.run().wait_until_finish()
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 407, in run
> beam_1         |     self._options).run(False)
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 680, in from_runner_api
> beam_1         |     context.transforms.get_by_id(root_transform_id)]
> beam_1         |   File
>
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 94, in get_by_id
> beam_1         |     self._id_to_proto[id], self._pipeline_context)
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 913, in from_runner_api
> beam_1         |     part = context.transforms.get_by_id(transform_id)
> beam_1         |   File
>
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 94, in get_by_id
> beam_1         |     self._id_to_proto[id], self._pipeline_context)
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 913, in from_runner_api
> beam_1         |     part = context.transforms.get_by_id(transform_id)
> beam_1         |   File
>
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 94, in get_by_id
> beam_1         |     self._id_to_proto[id], self._pipeline_context)
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 913, in from_runner_api
> beam_1         |     part = context.transforms.get_by_id(transform_id)
> beam_1         |   File
>
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 94, in get_by_id
> beam_1         |     self._id_to_proto[id], self._pipeline_context)
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 913, in from_runner_api
> beam_1         |     part = context.transforms.get_by_id(transform_id)
> beam_1         |   File
>
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 94, in get_by_id
> beam_1         |     self._id_to_proto[id], self._pipeline_context)
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 913, in from_runner_api
> beam_1         |     part = context.transforms.get_by_id(transform_id)
> beam_1         |   File
>
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 94, in get_by_id
> beam_1         |     self._id_to_proto[id], self._pipeline_context)
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 904, in from_runner_api
> beam_1         |
> transform=ptransform.PTransform.from_runner_api(proto.spec, context),
> beam_1         |   File
>
> "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
> line 612, in from_runner_api
> beam_1         |     context)
> beam_1         |   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/core.py",
> line 1247, in from_runner_api_parameter
> beam_1         |     assert pardo_payload.do_fn.spec.urn ==
> python_urns.PICKLED_DOFN_INFO
> beam_1         | AssertionError
> --------------------------------------------------------------------------
>
> Befor above error I see below log in jobserver.
>
> --------------------------------------------------------------------------
> [grpc-default-executor-6] INFO
> org.apache.beam.runners.core.construction.expansion.ExpansionService -
> Expanding 'root' with URN 'beam:external:java:kafka:write:v1'
> --------------------------------------------------------------------------
>
> My snippet of pipeline code is below.
>
> --------------------------------------------------------------------------
>     with beam.Pipeline(options=options) as p:
>
>         lines = ( p | beam.Create(['Hello World.', 'Apache beam']) )
>         #
>         # Send to kafka
>         # WriteToKafka
>         #
> https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.external.kafka.html#apache_beam.io.external.kafka.WriteToKafka
>         #
>         # Producer config
>         # https://kafka.apache.org/documentation/#producerconfigs
>         #
>         ( lines  | beam.Map(lambda x: ('payload', x) )
>                  | WriteToKafka(
>                       { 'acks': 'all', 'bootstrap.servers':
> 'localhost:9092'},
>                       'beam',
>
> 'org.apache.kafka.common.serialization.ByteArraySerializer',
>
> 'org.apache.kafka.common.serialization.ByteArraySerializer',
>                       'localhost:8097' ))
> --------------------------------------------------------------------------
>
> I have my platform built upon docker engine and used below combination
> of modules.
>
> 1. apache-beam: 2.16.0
> 2. flink: 1.8
> 3. python-sdk(37): compiled using release-2.16.0
> 4. jobserver: compiled using release-2.16.0
> 5. kafka: 2.3.x (using confluent 5.3.x)
>
> * docker compose
>
> https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml
>
> Would there be any setting I am missing to avoid the error ?
> I appreciate if I could get assistance for solving the error.
>
> Best Regards,
> Yu Watanabe
>
> --
> Yu Watanabe
> [email protected]
>

Reply via email to