Can you add a type hint? | "temp convert" >> beam.Map(temp_convert).with_output_types(typing.Tuple[ bytes, bytes])
On Fri, 4 Oct 2024 at 09:02, Henry Tremblay via user <user@beam.apache.org> wrote: > So I changed the temp_convert to return a tuple of bytes, and now am > getting: > > > > raise RuntimeError(result.error) > > RuntimeError: org.apache.beam.sdk.coders.CoderException: > `UnknownCoderWrapper` was used to perform an actual decoding in the Java > SDK. Potentially a Java transform is being followed by a cross-language > transform thatuses a coder that is not available in the Java SDK. Please > make sure that Python transforms at the multi-language boundary use Beam > portable coders. > > > > I am using the DirectRunner > > > > Thanks! > > > > *From:* Jaehyeon Kim <dott...@gmail.com> > *Sent:* Thursday, October 3, 2024 3:57 PM > *To:* user@beam.apache.org > *Subject:* Re: How to write to a Kafka topic? > > > > You don't often get email from dott...@gmail.com. Learn why this is > important <https://aka.ms/LearnAboutSenderIdentification> > > Hello, > > > > It doesn't seem that the following transform outputs a tuple of bytes. > > > > | "temp convert" >> beam.Map(temp_convert) > > > > You should convert the output into something like ('key'.encode('utf-8'), > 'value'.encode('utf-8')). > > > > I wrote a simple post about Kafka I/O read/write on the Flink Runner, and > hope it helps - https://jaehyeon.me/blog/2024-04-18-beam-local-dev-3/ > > > > Cheers, > > Jaehyeon > > > > > > > > On Fri, 4 Oct 2024 at 08:21, Henry Tremblay via user <user@beam.apache.org> > wrote: > > I am creating a simple pipeline to read and then write to Kafka. Here is > my code: > > 156 *with* Pipeline(options=pipeline_options) *as* pipeline: > > 157 main = ( > > 158 pipeline > > 159 | ReadFromKafka( > > 160 consumer_config={'bootstrap.servers': configs[' > bootstrap_servers'], > > 161 'group.id': 'my-group', > > 162 'isolation.level': 'read_uncommitted', > > 163 }, > > 164 topics= configs['topics'], > > 165 max_num_records = max_num_records, > > 166 commit_offset_in_finalize = True, > > 167 with_metadata=True) > > 168 | "temp convert" >> beam.Map(temp_convert) > > 169 | "Write to Kafka" >> WriteToKafka( > > 170 producer_config={'bootstrap.servers': configs[' > bootstrap_servers']}, > > 171 topic=configs['out_topic'], > > 172 ) > > 173 ) > > > > > > This is giving me > > > > RuntimeError: java.lang.ClassCastException: class > org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to > class org.apache.beam.sdk.coders.KvCoder > (org.apache.beam.sdk.util.construction.UnknownCoderWrapper and > org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')’ > > > > Can someone point me to a simple snipet for writing to Kafka in Python? I > have looked in vain on the web. > > > > Thanks! > >