Hi Anjana,

Thanks for providing some context, I understand now.

Imho, the key point to reconsider here is the usage of a Pojo. When using a 
schema registry, it’s important to be explicit about the schema you are using. 
Don’t rely on any intermediary to infer that schema for you, you absolutely 
want to be in control. Without an explicit Avro schema, schema evolution can 
later become a challenge.

I’d recommend you follow these steps:

  1.  Define an Avro schema for TestObject
  2.  Use the Avro compiler to generate a Java class for that schema
  3.  Use withValueSerializer((Class) KafkaAvroSerializer.class), if using a 
cast things will work
  4.  Set the producer option "auto.register.schemas" to true to publish new 
versions of your schema to the registry

I hope that helps.
Regards,
Moritz

From: "Gunasekara, Anjana" <[email protected]>
Reply to: "[email protected]" <[email protected]>
Date: Saturday, 18. December 2021 at 06:19
To: "[email protected]" <[email protected]>
Subject: RE: How to use avro serializer in Kafka write?

Hi Moritz, Thank for the response. I’ve tried to use KafkaAvroSerializer. But 
Kafka write() requests a serializer class for the value serialization which 
implemented with serialize<TestObject> in my case. So I created one and then it 
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Moritz,

Thank for the response. I’ve tried to use KafkaAvroSerializer. But Kafka 
write() requests a serializer class for the value serialization which 
implemented with serialize<TestObject> in my case.
So I created one and then it published without any errors. But the thing is 
those messages are not serialized with avro.

When I dig bit deep into KafkaAvroSeriealizer it seems we cannot use simple 
java objects, KafkaAvroSeriealizer seeks for a schema in those objects. Any 
thought of that? Hope my issue is now clear.



public PDone expand(PCollection<TestObject> input) {
    return input
            .apply(
                    "writePubsubMessagesToKafka", KafkaIO.<Void, TestObject 
>write()
                            .withBootstrapServers(options.getBootstrapServers())
                            
.withProducerConfigUpdates(java.util.Map.of("sasl.mechanism", 
options.getSaslMechanism(),
                                    "security.protocol", 
options.getSecurityProtocol(),
                                    "ssl.endpoint.identification.algorithm", 
options.getSslEndpointIdentificationAlgorithm(),
                                    "request.timeout.ms", 
options.getRequestTimeoutMs(),
                                    "retry.backoff.ms", 
options.getRetryBackoffMs(),
                                    
"client.dns.lookup",options.getClientDnsLookup(),
                                    "schema.registry.url", 
options.getSchemaRegistryUrl(),
                                    "basic.auth.credentials.source", 
options.getBasicAuthCredentialSource(),
                                    "basic.auth.user.info", 
options.getSchemaRegistryApiKey() + ":" + options.getSchemaRegistrySecret(),
                                    "sasl.jaas.config", 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\""+options.getClusterApiKey()+"\" 
password=\""+options.getClusterSecret()+"\";"
                            ))
                            .withValueSerializer(TestObjectSerializer.class) 
//cannot directly use KafkaAvroSerializer
                            .withTopic(options.getOutputTopic()) // just need 
serializer for value
                            .values());
}


Regards,
Anjana

From: Moritz Mack<mailto:[email protected]>
Sent: Friday, December 17, 2021 8:39 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: How to use avro serializer in Kafka write?

EXTERNAL EMAIL: This email originated from outside of the organization. Do not 
click links or open attachments unless you recognize the sender and know the 
content is safe
Anjana,

Sorry, I totally missed you are talking about writes.
Usually that’s the simpler case of the two, other than the consumer the 
producer owns data & schema and there’s little unknowns.

For that use case I suggest using the KafkaAvroSerializer, using 
withProducerConfigUpdates you can provide the necessary configuration 
(schema.registry.url, …)
But I’m not sure I fully understand your issue. Could you share some code 
snippets?

Regards, Moritz

From: Moritz Mack <[email protected]>
Reply to: "[email protected]" <[email protected]>
Date: Friday, 17. December 2021 at 12:57
To: "[email protected]" <[email protected]>
Subject: Re: How to use avro serializer in Kafka write?

Hi Anjana, Have you checked the Javadocs of KafkaIO? It is pretty straight 
forward: PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline 
.apply(KafkaIO.<Long, GenericRecord>read() ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ 
‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Anjana,

Have you checked the Javadocs of KafkaIO?
It is pretty straight forward:

PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
   .apply(KafkaIO.<Long, GenericRecord>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")
      .withKeyDeserializer(LongDeserializer.class)
      // Use Confluent Schema Registry, specify schema registry URL and value 
subject
      .withValueDeserializer(
          
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081";<https://urldefense.com/v3/__http:/localhost:8081*22__;JQ!!CiXD_PY!G2Jd413X8BztXoMnFg69IOJ6YaxEaLSBfZEBVPc7faR-CZIF87tdx94dsB1J$>,
 "my_topic-value"))

Key is that you’ll be working with generic Avro records. There’s also further 
details on this in the docs:


 For an Avro schema it will return a {@link PCollection} of {@link 
KafkaRecord}s where key
 and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. 
In this case, users
 don't need to specify key or/and value deserializers and coders since they 
will be set to {@link
 KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly.

Regards,
Moritz

From: "Gunasekara, Anjana" <[email protected]>
Reply to: "[email protected]" <[email protected]>
Date: Friday, 17. December 2021 at 03:25
To: "[email protected]" <[email protected]>
Subject: How to use avro serializer in Kafka write?

Hi, I’m trying to write avro encoded messages using Kafka write in java 
adhering to a confluent schema registry. Currently I’m having a difficulty to 
find proper value serializer for that. I used my custom serializer but it ain’t 
serialized ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi,

I’m trying to write avro encoded messages using Kafka write in java adhering to 
a confluent schema registry. Currently I’m having a difficulty to find proper 
value serializer for that. I used my custom serializer but it ain’t serialized 
properly.

If anyone have a working example of writing avro encoded messages to confluent 
Kafka topic which has a schema, it would be really great!

Thanks in advance

Regards,
Anjana


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice (updated August 2020) at Talend, 
Inc. 
<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.talend.com_contacts-2Dprivacy-2Dpolicy_&d=DwMGaQ&c=Iej4I5bEYPmgv5l2sS6i8A&r=KMJ3a4repD0cvRLiDurXpawigvc4HY-HfLgIu7_jSQg&m=tTzqdO-yQ11RzyjV9m_Uk0oGIzwvBYanhbWHXf2JMr-3ov_qYXGHMFk6hl5HPsyB&s=cDlzxBjDwIHk4Ge0YNDlTeNyGYfvFCXsi8VB1_mZkLA&e=>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice (updated August 2020) at Talend, 
Inc. 
<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.talend.com_contacts-2Dprivacy-2Dpolicy_&d=DwMGaQ&c=Iej4I5bEYPmgv5l2sS6i8A&r=KMJ3a4repD0cvRLiDurXpawigvc4HY-HfLgIu7_jSQg&m=tTzqdO-yQ11RzyjV9m_Uk0oGIzwvBYanhbWHXf2JMr-3ov_qYXGHMFk6hl5HPsyB&s=cDlzxBjDwIHk4Ge0YNDlTeNyGYfvFCXsi8VB1_mZkLA&e=>



As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice (updated August 2020) at Talend, 
Inc. <https://www.talend.com/contacts-privacy-policy/>


Reply via email to