Can you add a type hint?

| "temp convert" >> beam.Map(temp_convert).with_output_types(typing.Tuple[
bytes, bytes])

On Fri, 4 Oct 2024 at 09:02, Henry Tremblay via user <user@beam.apache.org>
wrote:

> So I changed the temp_convert to return a tuple of bytes, and now am
> getting:
>
>
>
>     raise RuntimeError(result.error)
>
> RuntimeError: org.apache.beam.sdk.coders.CoderException:
> `UnknownCoderWrapper` was used to perform an actual decoding in the Java
> SDK. Potentially a Java transform is being followed by a cross-language
> transform thatuses a coder that is not available in the Java SDK. Please
> make sure that Python transforms at the multi-language boundary use Beam
> portable coders.
>
>
>
> I am using the DirectRunner
>
>
>
> Thanks!
>
>
>
> *From:* Jaehyeon Kim <dott...@gmail.com>
> *Sent:* Thursday, October 3, 2024 3:57 PM
> *To:* user@beam.apache.org
> *Subject:* Re: How to write to a Kafka topic?
>
>
>
> You don't often get email from dott...@gmail.com. Learn why this is
> important <https://aka.ms/LearnAboutSenderIdentification>
>
> Hello,
>
>
>
> It doesn't seem that the following transform outputs a tuple of bytes.
>
>
>
> | "temp convert" >> beam.Map(temp_convert)
>
>
>
> You should convert the output into something like ('key'.encode('utf-8'),
> 'value'.encode('utf-8')).
>
>
>
> I wrote a simple post about Kafka I/O read/write on the Flink Runner, and
> hope it helps - https://jaehyeon.me/blog/2024-04-18-beam-local-dev-3/
>
>
>
> Cheers,
>
> Jaehyeon
>
>
>
>
>
>
>
> On Fri, 4 Oct 2024 at 08:21, Henry Tremblay via user <user@beam.apache.org>
> wrote:
>
> I am creating a simple pipeline to read and then write to Kafka. Here is
> my code:
>
> 156     *with* Pipeline(options=pipeline_options) *as* pipeline:
>
> 157         main = (
>
> 158             pipeline
>
> 159             | ReadFromKafka(
>
> 160                 consumer_config={'bootstrap.servers': configs['
> bootstrap_servers'],
>
> 161                     'group.id': 'my-group',
>
> 162                     'isolation.level': 'read_uncommitted',
>
> 163                     },
>
> 164                 topics= configs['topics'],
>
> 165                 max_num_records = max_num_records,
>
> 166                 commit_offset_in_finalize = True,
>
> 167                 with_metadata=True)
>
> 168             | "temp convert" >> beam.Map(temp_convert)
>
> 169             | "Write to Kafka" >>  WriteToKafka(
>
> 170                 producer_config={'bootstrap.servers': configs['
> bootstrap_servers']},
>
> 171                     topic=configs['out_topic'],
>
> 172                     )
>
> 173             )
>
>
>
>
>
> This is giving me
>
>
>
> RuntimeError: java.lang.ClassCastException: class
> org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to
> class org.apache.beam.sdk.coders.KvCoder
> (org.apache.beam.sdk.util.construction.UnknownCoderWrapper and
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')’
>
>
>
> Can someone point me to a simple snipet for writing to Kafka in Python? I
> have looked in vain on the web.
>
>
>
> Thanks!
>
>

Reply via email to