Anjana, Sorry, I totally missed you are talking about writes. Usually that’s the simpler case of the two, other than the consumer the producer owns data & schema and there’s little unknowns.
For that use case I suggest using the KafkaAvroSerializer, using withProducerConfigUpdates you can provide the necessary configuration (schema.registry.url, …) But I’m not sure I fully understand your issue. Could you share some code snippets? Regards, Moritz From: Moritz Mack <[email protected]> Reply to: "[email protected]" <[email protected]> Date: Friday, 17. December 2021 at 12:57 To: "[email protected]" <[email protected]> Subject: Re: How to use avro serializer in Kafka write? Hi Anjana, Have you checked the Javadocs of KafkaIO? It is pretty straight forward: PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline .apply(KafkaIO.<Long, GenericRecord>read() ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Anjana, Have you checked the Javadocs of KafkaIO? It is pretty straight forward: PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline .apply(KafkaIO.<Long, GenericRecord>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("my_topic") .withKeyDeserializer(LongDeserializer.class) // Use Confluent Schema Registry, specify schema registry URL and value subject .withValueDeserializer( ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081"<https://urldefense.com/v3/__http:/localhost:8081*22__;JQ!!CiXD_PY!G2Jd413X8BztXoMnFg69IOJ6YaxEaLSBfZEBVPc7faR-CZIF87tdx94dsB1J$>, "my_topic-value")) Key is that you’ll be working with generic Avro records. There’s also further details on this in the docs: For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users don't need to specify key or/and value deserializers and coders since they will be set to {@link KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly. Regards, Moritz From: "Gunasekara, Anjana" <[email protected]> Reply to: "[email protected]" <[email protected]> Date: Friday, 17. December 2021 at 03:25 To: "[email protected]" <[email protected]> Subject: How to use avro serializer in Kafka write? Hi, I’m trying to write avro encoded messages using Kafka write in java adhering to a confluent schema registry. Currently I’m having a difficulty to find proper value serializer for that. I used my custom serializer but it ain’t serialized ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi, I’m trying to write avro encoded messages using Kafka write in java adhering to a confluent schema registry. Currently I’m having a difficulty to find proper value serializer for that. I used my custom serializer but it ain’t serialized properly. If anyone have a working example of writing avro encoded messages to confluent Kafka topic which has a schema, it would be really great! Thanks in advance Regards, Anjana As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>
