@Alexey Romanenko<mailto:[email protected]> thanks so much for your 
suggestions.

Actually I found the below code seems to work.

KafkaIO
        .<Void, GenericRecord>write()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topicName)
        .withValueSerializer((Class) KafkaAvroSerializer.class)
        .withProducerConfigUpdates(ImmutableMap.of("schema.registry.url", 
schemaRegistryUrl))

Thanks and I hope there will be more great improvements coming in future, as 
you mentioned 😊

From: Alexey Romanenko <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, December 9, 2020 at 9:08 AM
To: "[email protected]" <[email protected]>
Subject: Re: Quick question about KafkaIO.Write<K,V>

AFAIR, DeserializerProvider in KafkaIO was added along with adding a Confluent 
Schema Registry's support in KafkaIO.Read to provide a universal way to use 
different Deserializers (it’s Local and ConfluentSchemaRegistry for the moment).

Regarding Write part, I believe we can do the similar refactoring. Feel free to 
provide a patch, we can help with review/testing/advices.

For now, just an idea of workaround (I didn’t test it) - you need to fetch your 
schema from Schema Registry in advance by yourself with SchemaRegistryClient to 
create an Avro record for write (e.g. GenericRecord) and then set 
KafkaAvroSerializer as ValueSerializer and specify ā€œschema.registry.urlā€ in 
producer properties.


On 8 Dec 2020, at 20:59, Tao Li <[email protected]<mailto:[email protected]>> wrote:

Hi Beam community,

I got a quick question about withValueSerializer() method of KafkaIO.Write<K,V> 
class:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Write.html&data=04%7C01%7Ctaol%40zillow.com%7C8e4ef48dfd1943bf9b5108d89c65082f%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637431305030435395%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=6%2FEKkQe0domyHoqg%2FTe6bZPZBFypiQtmKRPgyKj0w1o%3D&reserved=0>

The withValueSerializer method does not support passing in a serializer 
provider. The problem with lacking that functionality is that I cannot use 
Kafka schema registry to fetch the schema for serialization.

However at the same time, the KafkaIO.Read<K,V> withKeyDeserializer method 
supports specifying a deserializer 
provider:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Read.html%23withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-&data=04%7C01%7Ctaol%40zillow.com%7C8e4ef48dfd1943bf9b5108d89c65082f%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637431305030435395%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=OXx6Gtd00OsIrw9yMP1kflAwpXzveb%2FM6IoggnkpWkk%3D&reserved=0>

Is this a gap for KafkaIO.Write<K,V> or is it by design? Is there a workaround 
to specify the schema registry info for KafkaIO.Write<K,V>?

Thanks so much!

Reply via email to