Hi Anjana, Thanks for providing some context, I understand now.
Imho, the key point to reconsider here is the usage of a Pojo. When using a schema registry, it’s important to be explicit about the schema you are using. Don’t rely on any intermediary to infer that schema for you, you absolutely want to be in control. Without an explicit Avro schema, schema evolution can later become a challenge. I’d recommend you follow these steps: 1. Define an Avro schema for TestObject 2. Use the Avro compiler to generate a Java class for that schema 3. Use withValueSerializer((Class) KafkaAvroSerializer.class), if using a cast things will work 4. Set the producer option "auto.register.schemas" to true to publish new versions of your schema to the registry I hope that helps. Regards, Moritz From: "Gunasekara, Anjana" <[email protected]> Reply to: "[email protected]" <[email protected]> Date: Saturday, 18. December 2021 at 06:19 To: "[email protected]" <[email protected]> Subject: RE: How to use avro serializer in Kafka write? Hi Moritz, Thank for the response. I’ve tried to use KafkaAvroSerializer. But Kafka write() requests a serializer class for the value serialization which implemented with serialize<TestObject> in my case. So I created one and then it ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Moritz, Thank for the response. I’ve tried to use KafkaAvroSerializer. But Kafka write() requests a serializer class for the value serialization which implemented with serialize<TestObject> in my case. So I created one and then it published without any errors. But the thing is those messages are not serialized with avro. When I dig bit deep into KafkaAvroSeriealizer it seems we cannot use simple java objects, KafkaAvroSeriealizer seeks for a schema in those objects. Any thought of that? Hope my issue is now clear. public PDone expand(PCollection<TestObject> input) { return input .apply( "writePubsubMessagesToKafka", KafkaIO.<Void, TestObject >write() .withBootstrapServers(options.getBootstrapServers()) .withProducerConfigUpdates(java.util.Map.of("sasl.mechanism", options.getSaslMechanism(), "security.protocol", options.getSecurityProtocol(), "ssl.endpoint.identification.algorithm", options.getSslEndpointIdentificationAlgorithm(), "request.timeout.ms", options.getRequestTimeoutMs(), "retry.backoff.ms", options.getRetryBackoffMs(), "client.dns.lookup",options.getClientDnsLookup(), "schema.registry.url", options.getSchemaRegistryUrl(), "basic.auth.credentials.source", options.getBasicAuthCredentialSource(), "basic.auth.user.info", options.getSchemaRegistryApiKey() + ":" + options.getSchemaRegistrySecret(), "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+options.getClusterApiKey()+"\" password=\""+options.getClusterSecret()+"\";" )) .withValueSerializer(TestObjectSerializer.class) //cannot directly use KafkaAvroSerializer .withTopic(options.getOutputTopic()) // just need serializer for value .values()); } Regards, Anjana From: Moritz Mack<mailto:[email protected]> Sent: Friday, December 17, 2021 8:39 PM To: [email protected]<mailto:[email protected]> Subject: Re: How to use avro serializer in Kafka write? EXTERNAL EMAIL: This email originated from outside of the organization. Do not click links or open attachments unless you recognize the sender and know the content is safe Anjana, Sorry, I totally missed you are talking about writes. Usually that’s the simpler case of the two, other than the consumer the producer owns data & schema and there’s little unknowns. For that use case I suggest using the KafkaAvroSerializer, using withProducerConfigUpdates you can provide the necessary configuration (schema.registry.url, …) But I’m not sure I fully understand your issue. Could you share some code snippets? Regards, Moritz From: Moritz Mack <[email protected]> Reply to: "[email protected]" <[email protected]> Date: Friday, 17. December 2021 at 12:57 To: "[email protected]" <[email protected]> Subject: Re: How to use avro serializer in Kafka write? Hi Anjana, Have you checked the Javadocs of KafkaIO? It is pretty straight forward: PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline .apply(KafkaIO.<Long, GenericRecord>read() ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Anjana, Have you checked the Javadocs of KafkaIO? It is pretty straight forward: PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline .apply(KafkaIO.<Long, GenericRecord>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("my_topic") .withKeyDeserializer(LongDeserializer.class) // Use Confluent Schema Registry, specify schema registry URL and value subject .withValueDeserializer( ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081"<https://urldefense.com/v3/__http:/localhost:8081*22__;JQ!!CiXD_PY!G2Jd413X8BztXoMnFg69IOJ6YaxEaLSBfZEBVPc7faR-CZIF87tdx94dsB1J$>, "my_topic-value")) Key is that you’ll be working with generic Avro records. There’s also further details on this in the docs: For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users don't need to specify key or/and value deserializers and coders since they will be set to {@link KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly. Regards, Moritz From: "Gunasekara, Anjana" <[email protected]> Reply to: "[email protected]" <[email protected]> Date: Friday, 17. December 2021 at 03:25 To: "[email protected]" <[email protected]> Subject: How to use avro serializer in Kafka write? Hi, I’m trying to write avro encoded messages using Kafka write in java adhering to a confluent schema registry. Currently I’m having a difficulty to find proper value serializer for that. I used my custom serializer but it ain’t serialized ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi, I’m trying to write avro encoded messages using Kafka write in java adhering to a confluent schema registry. Currently I’m having a difficulty to find proper value serializer for that. I used my custom serializer but it ain’t serialized properly. If anyone have a working example of writing avro encoded messages to confluent Kafka topic which has a schema, it would be really great! Thanks in advance Regards, Anjana As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.talend.com_contacts-2Dprivacy-2Dpolicy_&d=DwMGaQ&c=Iej4I5bEYPmgv5l2sS6i8A&r=KMJ3a4repD0cvRLiDurXpawigvc4HY-HfLgIu7_jSQg&m=tTzqdO-yQ11RzyjV9m_Uk0oGIzwvBYanhbWHXf2JMr-3ov_qYXGHMFk6hl5HPsyB&s=cDlzxBjDwIHk4Ge0YNDlTeNyGYfvFCXsi8VB1_mZkLA&e=> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.talend.com_contacts-2Dprivacy-2Dpolicy_&d=DwMGaQ&c=Iej4I5bEYPmgv5l2sS6i8A&r=KMJ3a4repD0cvRLiDurXpawigvc4HY-HfLgIu7_jSQg&m=tTzqdO-yQ11RzyjV9m_Uk0oGIzwvBYanhbWHXf2JMr-3ov_qYXGHMFk6hl5HPsyB&s=cDlzxBjDwIHk4Ge0YNDlTeNyGYfvFCXsi8VB1_mZkLA&e=> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>
