So I changed the temp_convert to return a tuple of bytes, and now am getting:
raise RuntimeError(result.error) RuntimeError: org.apache.beam.sdk.coders.CoderException: `UnknownCoderWrapper` was used to perform an actual decoding in the Java SDK. Potentially a Java transform is being followed by a cross-language transform thatuses a coder that is not available in the Java SDK. Please make sure that Python transforms at the multi-language boundary use Beam portable coders. I am using the DirectRunner Thanks! From: Jaehyeon Kim <dott...@gmail.com> Sent: Thursday, October 3, 2024 3:57 PM To: user@beam.apache.org Subject: Re: How to write to a Kafka topic? You don't often get email from dott...@gmail.com<mailto:dott...@gmail.com>. Learn why this is important<https://aka.ms/LearnAboutSenderIdentification> Hello, It doesn't seem that the following transform outputs a tuple of bytes. | "temp convert" >> beam.Map(temp_convert) You should convert the output into something like ('key'.encode('utf-8'), 'value'.encode('utf-8')). I wrote a simple post about Kafka I/O read/write on the Flink Runner, and hope it helps - https://jaehyeon.me/blog/2024-04-18-beam-local-dev-3/ Cheers, Jaehyeon On Fri, 4 Oct 2024 at 08:21, Henry Tremblay via user <user@beam.apache.org<mailto:user@beam.apache.org>> wrote: I am creating a simple pipeline to read and then write to Kafka. Here is my code: 156 with Pipeline(options=pipeline_options) as pipeline: 157 main = ( 158 pipeline 159 | ReadFromKafka( 160 consumer_config={'bootstrap.servers': configs['bootstrap_servers'], 161 'group.id<http://group.id>': 'my-group', 162 'isolation.level': 'read_uncommitted', 163 }, 164 topics= configs['topics'], 165 max_num_records = max_num_records, 166 commit_offset_in_finalize = True, 167 with_metadata=True) 168 | "temp convert" >> beam.Map(temp_convert) 169 | "Write to Kafka" >> WriteToKafka( 170 producer_config={'bootstrap.servers': configs['bootstrap_servers']}, 171 topic=configs['out_topic'], 172 ) 173 ) This is giving me RuntimeError: java.lang.ClassCastException: class org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to class org.apache.beam.sdk.coders.KvCoder (org.apache.beam.sdk.util.construction.UnknownCoderWrapper and org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')’ Can someone point me to a simple snipet for writing to Kafka in Python? I have looked in vain on the web. Thanks!