Hello.

I have some progress with kafka source output on python but still not
able to get "WriteToKafka" working.
I appreciate if I could get help.

I was able to pass the assertion error following example for External Transform.
Defining ParDo instead Map before "WriteToKafka", I was able to use
the correct Urn.

https://github.com/apache/beam/blob/819a242453eac9a2b3d79df0b259c8cc844b6af1/sdks/python/apache_beam/examples/wordcount_xlang.py#L70

However, I get coder casting error on jobserver.

==========================================================================
jobserver_1    | java.lang.ClassCastException:
org.apache.beam.sdk.coders.ByteArrayCoder cannot be cast to
org.apache.beam.sdk.coders.KvCoder
                             at
org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:1552)
==========================================================================

In KafkaIO.java, as error says, it tries to cast the input coder to KVCoder.
==========================================================================
KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
==========================================================================

In my code, following the example from ExternalTransform, I have
passed bytes object to External Transform.
==========================================================================
class ToKV(beam.DoFn):

    def process(self, elem):

        return bytes(elem.encode())

......

res = ( lines | beam.ParDo(ToKV()).with_output_types(bytes)
              | WriteToKafka(
              { 'acks': 'all', 'bootstrap.servers': 'localhost:9092'},
              'beam',
              'org.apache.kafka.common.serialization.ByteArraySerializer',
              'org.apache.kafka.common.serialization.ByteArraySerializer',
              'localhost:8097' ))
==========================================================================

So my question is which coder should I use in order to cast into
KVCoder on java side ?

Thanks,
Yu

On Thu, Jan 9, 2020 at 7:17 AM Chamikara Jayalath <[email protected]> wrote:
>
> Hmm, seems like a Java (external) ParDo is being forwarded to Python SDK for 
> execution somehow. +Maximilian Michels might know more.
>
> On Sun, Jan 5, 2020 at 2:57 AM Yu Watanabe <[email protected]> wrote:
>>
>> Hello.
>>
>> I would like to use sinking data into kafka using kafka module for
>> python , however,
>> getting below assertion  error when pipeline is executed.
>>
>> --------------------------------------------------------------------------
>> beam_1         | Traceback (most recent call last):
>> beam_1         |   File "src/sample.py", line 50, in <module>
>> beam_1         |     main()
>> beam_1         |   File "src/sample.py", line 46, in main
>> beam_1         |     'localhost:8097' ))
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 427, in __exit__
>> beam_1         |     self.run().wait_until_finish()
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 407, in run
>> beam_1         |     self._options).run(False)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 680, in from_runner_api
>> beam_1         |     context.transforms.get_by_id(root_transform_id)]
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1         |     self._id_to_proto[id], self._pipeline_context)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 913, in from_runner_api
>> beam_1         |     part = context.transforms.get_by_id(transform_id)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1         |     self._id_to_proto[id], self._pipeline_context)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 913, in from_runner_api
>> beam_1         |     part = context.transforms.get_by_id(transform_id)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1         |     self._id_to_proto[id], self._pipeline_context)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 913, in from_runner_api
>> beam_1         |     part = context.transforms.get_by_id(transform_id)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1         |     self._id_to_proto[id], self._pipeline_context)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 913, in from_runner_api
>> beam_1         |     part = context.transforms.get_by_id(transform_id)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1         |     self._id_to_proto[id], self._pipeline_context)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 913, in from_runner_api
>> beam_1         |     part = context.transforms.get_by_id(transform_id)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1         |     self._id_to_proto[id], self._pipeline_context)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 904, in from_runner_api
>> beam_1         |
>> transform=ptransform.PTransform.from_runner_api(proto.spec, context),
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>> line 612, in from_runner_api
>> beam_1         |     context)
>> beam_1         |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>> line 1247, in from_runner_api_parameter
>> beam_1         |     assert pardo_payload.do_fn.spec.urn ==
>> python_urns.PICKLED_DOFN_INFO
>> beam_1         | AssertionError
>> --------------------------------------------------------------------------
>>
>> Befor above error I see below log in jobserver.
>>
>> --------------------------------------------------------------------------
>> [grpc-default-executor-6] INFO
>> org.apache.beam.runners.core.construction.expansion.ExpansionService -
>> Expanding 'root' with URN 'beam:external:java:kafka:write:v1'
>> --------------------------------------------------------------------------
>>
>> My snippet of pipeline code is below.
>>
>> --------------------------------------------------------------------------
>>     with beam.Pipeline(options=options) as p:
>>
>>         lines = ( p | beam.Create(['Hello World.', 'Apache beam']) )
>>         #
>>         # Send to kafka
>>         # WriteToKafka
>>         # 
>> https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.external.kafka.html#apache_beam.io.external.kafka.WriteToKafka
>>         #
>>         # Producer config
>>         # https://kafka.apache.org/documentation/#producerconfigs
>>         #
>>         ( lines  | beam.Map(lambda x: ('payload', x) )
>>                  | WriteToKafka(
>>                       { 'acks': 'all', 'bootstrap.servers': 
>> 'localhost:9092'},
>>                       'beam',
>>
>> 'org.apache.kafka.common.serialization.ByteArraySerializer',
>>
>> 'org.apache.kafka.common.serialization.ByteArraySerializer',
>>                       'localhost:8097' ))
>> --------------------------------------------------------------------------
>>
>> I have my platform built upon docker engine and used below combination
>> of modules.
>>
>> 1. apache-beam: 2.16.0
>> 2. flink: 1.8
>> 3. python-sdk(37): compiled using release-2.16.0
>> 4. jobserver: compiled using release-2.16.0
>> 5. kafka: 2.3.x (using confluent 5.3.x)
>>
>> * docker compose
>> https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml
>>
>> Would there be any setting I am missing to avoid the error ?
>> I appreciate if I could get assistance for solving the error.
>>
>> Best Regards,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> [email protected]



-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis

Reply via email to