The schema registry support is provided
in ConfluentRegistryAvroSerializationSchema class
(flink-avro-confluent-registry package).

On Thu, Feb 1, 2024 at 8:04 AM Yaroslav Tkachenko <yaros...@goldsky.com>
wrote:

> You can also implement a custom KafkaRecordSerializationSchema, which
> allows creating a ProducerRecord (see "serialize" method) - you can set
> message key, headers, etc. manually. It's supported in older versions.
>
> On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun <jiabao....@xtransfer.cn> wrote:
>
>> Sorry, I didn't notice the version information.
>> This feature was completed in FLINK-31049[1] and will be released in
>> version 3.1.0 of Kafka.
>> The release process[2] is currently underway and will be completed soon.
>>
>> However, version 3.1.0 does not promise support for Flink 1.16.
>> If you need to use this feature, you can consider cherry-picking this
>> commit[3] to the v3.0 branch and package it for your own use.
>>
>> Regarding Schema Registry, I am not familiar with this feature and I
>> apologize for not being able to provide an answer.
>>
>> Best,
>> Jiabao
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-31049
>> [2]
>> https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
>> [3] https://github.com/apache/flink-connector-kafka/pull/18
>>
>>
>> On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
>> > Hi Jiabao,
>> >
>> > Thanks for reply.
>> >
>> > Currently I am using Flink 1.16.1 and I am not able to find any
>> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
>> > Although on github I found this support here:
>> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
>> > But this doesn't seem released yet. Can you please point me towards
>> correct Flink version?
>> >
>> > Also, any help on question 1 regarding Schema Registry?
>> >
>> > Regards,
>> > Kirti Dhar
>> >
>> > -----Original Message-----
>> > From: Jiabao Sun <ji...@xtransfer.cn>
>> > Sent: 01 February 2024 13:29
>> > To: user@flink.apache.org
>> > Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
>> >
>> > Hi Kirti,
>> >
>> > Kafka Sink supports sending messages with headers.
>> > You should implement a HeaderProvider to extract headers from input
>> element.
>> >
>> >
>> > KafkaSink<String> sink = KafkaSink.<String>builder()
>> >         .setBootstrapServers(brokers)
>> >         .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>> >                 .setTopic("topic-name")
>> >                 .setValueSerializationSchema(new SimpleStringSchema())
>> >                 .setHeaderProvider(new HeaderProvider<String>() {
>> >                     @Override
>> >                     public Headers getHeaders(String input) {
>> >                         //TODO: implements it
>> >                         return null;
>> >                     }
>> >                 })
>> >                 .build()
>> >         )
>> >         .build();
>> >
>> > Best,
>> > Jiabao
>> >
>> >
>> > On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
>> > > Hi Mates,
>> > >
>> > > I have below queries regarding Flink Kafka Sink.
>> > >
>> > >
>> > >   1.  Does Kafka Sink support schema registry? If yes, is there any
>> documentations to configure the same?
>> > >   2.  Does Kafka Sink support sending  messages (ProducerRecord)
>> with headers?
>> > >
>> > >
>> > > Regards,
>> > > Kirti Dhar
>> > >
>> > >
>> >
>>
>

Reply via email to