So I changed the temp_convert to return a tuple of bytes, and now am getting:

    raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.coders.CoderException: `UnknownCoderWrapper` 
was used to perform an actual decoding in the Java SDK. Potentially a Java 
transform is being followed by a cross-language transform thatuses a coder that 
is not available in the Java SDK. Please make sure that Python transforms at 
the multi-language boundary use Beam portable coders.

I am using the DirectRunner

Thanks!

From: Jaehyeon Kim <dott...@gmail.com>
Sent: Thursday, October 3, 2024 3:57 PM
To: user@beam.apache.org
Subject: Re: How to write to a Kafka topic?

You don't often get email from dott...@gmail.com<mailto:dott...@gmail.com>. 
Learn why this is important<https://aka.ms/LearnAboutSenderIdentification>
Hello,

It doesn't seem that the following transform outputs a tuple of bytes.

| "temp convert" >> beam.Map(temp_convert)

You should convert the output into something like ('key'.encode('utf-8'), 
'value'.encode('utf-8')).

I wrote a simple post about Kafka I/O read/write on the Flink Runner, and hope 
it helps - https://jaehyeon.me/blog/2024-04-18-beam-local-dev-3/

Cheers,
Jaehyeon



On Fri, 4 Oct 2024 at 08:21, Henry Tremblay via user 
<user@beam.apache.org<mailto:user@beam.apache.org>> wrote:
I am creating a simple pipeline to read and then write to Kafka. Here is my 
code:
156     with Pipeline(options=pipeline_options) as pipeline:
157         main = (
158             pipeline
159             | ReadFromKafka(
160                 consumer_config={'bootstrap.servers': 
configs['bootstrap_servers'],
161                     'group.id<http://group.id>': 'my-group',
162                     'isolation.level': 'read_uncommitted',
163                     },
164                 topics= configs['topics'],
165                 max_num_records = max_num_records,
166                 commit_offset_in_finalize = True,
167                 with_metadata=True)
168             | "temp convert" >> beam.Map(temp_convert)
169             | "Write to Kafka" >>  WriteToKafka(
170                 producer_config={'bootstrap.servers': 
configs['bootstrap_servers']},
171                     topic=configs['out_topic'],
172                     )
173             )


This is giving me

RuntimeError: java.lang.ClassCastException: class 
org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to 
class org.apache.beam.sdk.coders.KvCoder 
(org.apache.beam.sdk.util.construction.UnknownCoderWrapper and 
org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')’

Can someone point me to a simple snipet for writing to Kafka in Python? I have 
looked in vain on the web.

Thanks!

Reply via email to