Hmm, seems like a Java (external) ParDo is being forwarded to Python SDK for execution somehow. +Maximilian Michels <[email protected]> might know more.
On Sun, Jan 5, 2020 at 2:57 AM Yu Watanabe <[email protected]> wrote: > Hello. > > I would like to use sinking data into kafka using kafka module for > python , however, > getting below assertion error when pipeline is executed. > > -------------------------------------------------------------------------- > beam_1 | Traceback (most recent call last): > beam_1 | File "src/sample.py", line 50, in <module> > beam_1 | main() > beam_1 | File "src/sample.py", line 46, in main > beam_1 | 'localhost:8097' )) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 427, in __exit__ > beam_1 | self.run().wait_until_finish() > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 407, in run > beam_1 | self._options).run(False) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 680, in from_runner_api > beam_1 | context.transforms.get_by_id(root_transform_id)] > beam_1 | File > > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 94, in get_by_id > beam_1 | self._id_to_proto[id], self._pipeline_context) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 913, in from_runner_api > beam_1 | part = context.transforms.get_by_id(transform_id) > beam_1 | File > > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 94, in get_by_id > beam_1 | self._id_to_proto[id], self._pipeline_context) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 913, in from_runner_api > beam_1 | part = context.transforms.get_by_id(transform_id) > beam_1 | File > > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 94, in get_by_id > beam_1 | self._id_to_proto[id], self._pipeline_context) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 913, in from_runner_api > beam_1 | part = context.transforms.get_by_id(transform_id) > beam_1 | File > > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 94, in get_by_id > beam_1 | self._id_to_proto[id], self._pipeline_context) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 913, in from_runner_api > beam_1 | part = context.transforms.get_by_id(transform_id) > beam_1 | File > > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 94, in get_by_id > beam_1 | self._id_to_proto[id], self._pipeline_context) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 913, in from_runner_api > beam_1 | part = context.transforms.get_by_id(transform_id) > beam_1 | File > > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 94, in get_by_id > beam_1 | self._id_to_proto[id], self._pipeline_context) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line > 904, in from_runner_api > beam_1 | > transform=ptransform.PTransform.from_runner_api(proto.spec, context), > beam_1 | File > > "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", > line 612, in from_runner_api > beam_1 | context) > beam_1 | File > "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/core.py", > line 1247, in from_runner_api_parameter > beam_1 | assert pardo_payload.do_fn.spec.urn == > python_urns.PICKLED_DOFN_INFO > beam_1 | AssertionError > -------------------------------------------------------------------------- > > Befor above error I see below log in jobserver. > > -------------------------------------------------------------------------- > [grpc-default-executor-6] INFO > org.apache.beam.runners.core.construction.expansion.ExpansionService - > Expanding 'root' with URN 'beam:external:java:kafka:write:v1' > -------------------------------------------------------------------------- > > My snippet of pipeline code is below. > > -------------------------------------------------------------------------- > with beam.Pipeline(options=options) as p: > > lines = ( p | beam.Create(['Hello World.', 'Apache beam']) ) > # > # Send to kafka > # WriteToKafka > # > https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.external.kafka.html#apache_beam.io.external.kafka.WriteToKafka > # > # Producer config > # https://kafka.apache.org/documentation/#producerconfigs > # > ( lines | beam.Map(lambda x: ('payload', x) ) > | WriteToKafka( > { 'acks': 'all', 'bootstrap.servers': > 'localhost:9092'}, > 'beam', > > 'org.apache.kafka.common.serialization.ByteArraySerializer', > > 'org.apache.kafka.common.serialization.ByteArraySerializer', > 'localhost:8097' )) > -------------------------------------------------------------------------- > > I have my platform built upon docker engine and used below combination > of modules. > > 1. apache-beam: 2.16.0 > 2. flink: 1.8 > 3. python-sdk(37): compiled using release-2.16.0 > 4. jobserver: compiled using release-2.16.0 > 5. kafka: 2.3.x (using confluent 5.3.x) > > * docker compose > > https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml > > Would there be any setting I am missing to avoid the error ? > I appreciate if I could get assistance for solving the error. > > Best Regards, > Yu Watanabe > > -- > Yu Watanabe > [email protected] >
