Hello.

I would like to use sinking data into kafka using kafka module for
python , however,
getting below assertion  error when pipeline is executed.

--------------------------------------------------------------------------
beam_1         | Traceback (most recent call last):
beam_1         |   File "src/sample.py", line 50, in <module>
beam_1         |     main()
beam_1         |   File "src/sample.py", line 46, in main
beam_1         |     'localhost:8097' ))
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
427, in __exit__
beam_1         |     self.run().wait_until_finish()
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
407, in run
beam_1         |     self._options).run(False)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
680, in from_runner_api
beam_1         |     context.transforms.get_by_id(root_transform_id)]
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1         |     self._id_to_proto[id], self._pipeline_context)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1         |     part = context.transforms.get_by_id(transform_id)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1         |     self._id_to_proto[id], self._pipeline_context)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1         |     part = context.transforms.get_by_id(transform_id)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1         |     self._id_to_proto[id], self._pipeline_context)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1         |     part = context.transforms.get_by_id(transform_id)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1         |     self._id_to_proto[id], self._pipeline_context)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1         |     part = context.transforms.get_by_id(transform_id)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1         |     self._id_to_proto[id], self._pipeline_context)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
913, in from_runner_api
beam_1         |     part = context.transforms.get_by_id(transform_id)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 94, in get_by_id
beam_1         |     self._id_to_proto[id], self._pipeline_context)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
904, in from_runner_api
beam_1         |
transform=ptransform.PTransform.from_runner_api(proto.spec, context),
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
line 612, in from_runner_api
beam_1         |     context)
beam_1         |   File
"/usr/local/lib/python3.7/site-packages/apache_beam/transforms/core.py",
line 1247, in from_runner_api_parameter
beam_1         |     assert pardo_payload.do_fn.spec.urn ==
python_urns.PICKLED_DOFN_INFO
beam_1         | AssertionError
--------------------------------------------------------------------------

Befor above error I see below log in jobserver.

--------------------------------------------------------------------------
[grpc-default-executor-6] INFO
org.apache.beam.runners.core.construction.expansion.ExpansionService -
Expanding 'root' with URN 'beam:external:java:kafka:write:v1'
--------------------------------------------------------------------------

My snippet of pipeline code is below.

--------------------------------------------------------------------------
    with beam.Pipeline(options=options) as p:

        lines = ( p | beam.Create(['Hello World.', 'Apache beam']) )
        #
        # Send to kafka
        # WriteToKafka
        # 
https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.external.kafka.html#apache_beam.io.external.kafka.WriteToKafka
        #
        # Producer config
        # https://kafka.apache.org/documentation/#producerconfigs
        #
        ( lines  | beam.Map(lambda x: ('payload', x) )
                 | WriteToKafka(
                      { 'acks': 'all', 'bootstrap.servers': 'localhost:9092'},
                      'beam',

'org.apache.kafka.common.serialization.ByteArraySerializer',

'org.apache.kafka.common.serialization.ByteArraySerializer',
                      'localhost:8097' ))
--------------------------------------------------------------------------

I have my platform built upon docker engine and used below combination
of modules.

1. apache-beam: 2.16.0
2. flink: 1.8
3. python-sdk(37): compiled using release-2.16.0
4. jobserver: compiled using release-2.16.0
5. kafka: 2.3.x (using confluent 5.3.x)

* docker compose
https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml

Would there be any setting I am missing to avoid the error ?
I appreciate if I could get assistance for solving the error.

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
[email protected]

Reply via email to