Hello. I have some progress with kafka source output on python but still not able to get "WriteToKafka" working. I appreciate if I could get help.
I was able to pass the assertion error following example for External Transform. Defining ParDo instead Map before "WriteToKafka", I was able to use the correct Urn. https://github.com/apache/beam/blob/819a242453eac9a2b3d79df0b259c8cc844b6af1/sdks/python/apache_beam/examples/wordcount_xlang.py#L70 However, I get coder casting error on jobserver. ========================================================================== jobserver_1 | java.lang.ClassCastException: org.apache.beam.sdk.coders.ByteArrayCoder cannot be cast to org.apache.beam.sdk.coders.KvCoder at org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:1552) ========================================================================== In KafkaIO.java, as error says, it tries to cast the input coder to KVCoder. ========================================================================== KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder(); ========================================================================== In my code, following the example from ExternalTransform, I have passed bytes object to External Transform. ========================================================================== class ToKV(beam.DoFn): def process(self, elem): return bytes(elem.encode()) ...... res = ( lines | beam.ParDo(ToKV()).with_output_types(bytes) | WriteToKafka( { 'acks': 'all', 'bootstrap.servers': 'localhost:9092'}, 'beam', 'org.apache.kafka.common.serialization.ByteArraySerializer', 'org.apache.kafka.common.serialization.ByteArraySerializer', 'localhost:8097' )) ========================================================================== So my question is which coder should I use in order to cast into KVCoder on java side ? Thanks, Yu On Thu, Jan 9, 2020 at 7:17 AM Chamikara Jayalath <[email protected]> wrote: > > Hmm, seems like a Java (external) ParDo is being forwarded to Python SDK for > execution somehow. +Maximilian Michels might know more. > > On Sun, Jan 5, 2020 at 2:57 AM Yu Watanabe <[email protected]> wrote: >> >> Hello. >> >> I would like to use sinking data into kafka using kafka module for >> python , however, >> getting below assertion error when pipeline is executed. >> >> -------------------------------------------------------------------------- >> beam_1 | Traceback (most recent call last): >> beam_1 | File "src/sample.py", line 50, in <module> >> beam_1 | main() >> beam_1 | File "src/sample.py", line 46, in main >> beam_1 | 'localhost:8097' )) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 427, in __exit__ >> beam_1 | self.run().wait_until_finish() >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 407, in run >> beam_1 | self._options).run(False) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 680, in from_runner_api >> beam_1 | context.transforms.get_by_id(root_transform_id)] >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 913, in from_runner_api >> beam_1 | part = context.transforms.get_by_id(transform_id) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 913, in from_runner_api >> beam_1 | part = context.transforms.get_by_id(transform_id) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 913, in from_runner_api >> beam_1 | part = context.transforms.get_by_id(transform_id) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 913, in from_runner_api >> beam_1 | part = context.transforms.get_by_id(transform_id) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 913, in from_runner_api >> beam_1 | part = context.transforms.get_by_id(transform_id) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 904, in from_runner_api >> beam_1 | >> transform=ptransform.PTransform.from_runner_api(proto.spec, context), >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >> line 612, in from_runner_api >> beam_1 | context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/core.py", >> line 1247, in from_runner_api_parameter >> beam_1 | assert pardo_payload.do_fn.spec.urn == >> python_urns.PICKLED_DOFN_INFO >> beam_1 | AssertionError >> -------------------------------------------------------------------------- >> >> Befor above error I see below log in jobserver. >> >> -------------------------------------------------------------------------- >> [grpc-default-executor-6] INFO >> org.apache.beam.runners.core.construction.expansion.ExpansionService - >> Expanding 'root' with URN 'beam:external:java:kafka:write:v1' >> -------------------------------------------------------------------------- >> >> My snippet of pipeline code is below. >> >> -------------------------------------------------------------------------- >> with beam.Pipeline(options=options) as p: >> >> lines = ( p | beam.Create(['Hello World.', 'Apache beam']) ) >> # >> # Send to kafka >> # WriteToKafka >> # >> https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.external.kafka.html#apache_beam.io.external.kafka.WriteToKafka >> # >> # Producer config >> # https://kafka.apache.org/documentation/#producerconfigs >> # >> ( lines | beam.Map(lambda x: ('payload', x) ) >> | WriteToKafka( >> { 'acks': 'all', 'bootstrap.servers': >> 'localhost:9092'}, >> 'beam', >> >> 'org.apache.kafka.common.serialization.ByteArraySerializer', >> >> 'org.apache.kafka.common.serialization.ByteArraySerializer', >> 'localhost:8097' )) >> -------------------------------------------------------------------------- >> >> I have my platform built upon docker engine and used below combination >> of modules. >> >> 1. apache-beam: 2.16.0 >> 2. flink: 1.8 >> 3. python-sdk(37): compiled using release-2.16.0 >> 4. jobserver: compiled using release-2.16.0 >> 5. kafka: 2.3.x (using confluent 5.3.x) >> >> * docker compose >> https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml >> >> Would there be any setting I am missing to avoid the error ? >> I appreciate if I could get assistance for solving the error. >> >> Best Regards, >> Yu Watanabe >> >> -- >> Yu Watanabe >> [email protected] -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
