Hello.
I would like to use sinking data into kafka using kafka module for
python , however,
getting below assertion error when pipeline is executed.
--------------------------------------------------------------------------
beam_1 | Traceback (most recent call last):
beam_1 | File "src/sample.py", line 50, in <module>
beam_1 | main()
beam_1 | File "src/sample.py", line 46, in main
beam_1 | 'localhost:8097' ))
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
427, in __exit__
beam_1 | self.run().wait_until_finish()
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
407, in run
beam_1 | self._options).run(False)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
680, in from_runner_api
beam_1 | context.transforms.get_by_id(root_transform_id)]
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1 | self._id_to_proto[id], self._pipeline_context)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1 | part = context.transforms.get_by_id(transform_id)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1 | self._id_to_proto[id], self._pipeline_context)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1 | part = context.transforms.get_by_id(transform_id)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1 | self._id_to_proto[id], self._pipeline_context)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1 | part = context.transforms.get_by_id(transform_id)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1 | self._id_to_proto[id], self._pipeline_context)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1 | part = context.transforms.get_by_id(transform_id)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1 | self._id_to_proto[id], self._pipeline_context)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1 | part = context.transforms.get_by_id(transform_id)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1 | self._id_to_proto[id], self._pipeline_context)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
904, in from_runner_api
beam_1 |
transform=ptransform.PTransform.from_runner_api(proto.spec, context),
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
line 612, in from_runner_api
beam_1 | context)
beam_1 | File
"/usr/local/lib/python3.7/site-packages/apache_beam/transforms/core.py",
line 1247, in from_runner_api_parameter
beam_1 | assert pardo_payload.do_fn.spec.urn ==
python_urns.PICKLED_DOFN_INFO
beam_1 | AssertionError
--------------------------------------------------------------------------
Befor above error I see below log in jobserver.
--------------------------------------------------------------------------
[grpc-default-executor-6] INFO
org.apache.beam.runners.core.construction.expansion.ExpansionService -
Expanding 'root' with URN 'beam:external:java:kafka:write:v1'
--------------------------------------------------------------------------
My snippet of pipeline code is below.
--------------------------------------------------------------------------
with beam.Pipeline(options=options) as p:
lines = ( p | beam.Create(['Hello World.', 'Apache beam']) )
#
# Send to kafka
# WriteToKafka
#
https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.external.kafka.html#apache_beam.io.external.kafka.WriteToKafka
#
# Producer config
# https://kafka.apache.org/documentation/#producerconfigs
#
( lines | beam.Map(lambda x: ('payload', x) )
| WriteToKafka(
{ 'acks': 'all', 'bootstrap.servers': 'localhost:9092'},
'beam',
'org.apache.kafka.common.serialization.ByteArraySerializer',
'org.apache.kafka.common.serialization.ByteArraySerializer',
'localhost:8097' ))
--------------------------------------------------------------------------
I have my platform built upon docker engine and used below combination
of modules.
1. apache-beam: 2.16.0
2. flink: 1.8
3. python-sdk(37): compiled using release-2.16.0
4. jobserver: compiled using release-2.16.0
5. kafka: 2.3.x (using confluent 5.3.x)
* docker compose
https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml
Would there be any setting I am missing to avoid the error ?
I appreciate if I could get assistance for solving the error.
Best Regards,
Yu Watanabe
--
Yu Watanabe
[email protected]