Hi,

I deployed Flink on Kubernetes (running on AWS EC2) using the Flink K8s 
operator v1.2.0 and a custom Dockerfile based on the image flink:2.0.0-java21.
My messages are in Kafka (Confluent Cloud), serialized with Protobuf and 
registered in the Confluent Schema Registry.
I was able to connect to the Kafka cluster and consume deserialized messages 
successfully using the SQL client. However, I couldn’t find a connector that 
works with the Confluent Schema Registry to deserialize Protobuf messages when 
using the Flink SQL API.
Question: Is there a Flink SQL connector that supports deserializing Protobuf 
messages from Confluent Kafka with Schema Registry integration? If not, what’s 
the recommended approach to achieve this?
This proto format: 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/
 does not work with confluent Wire Format i.e.:

 *   Magic byte (1 byte, always 0) → identifies Confluent’s wire format.
 *   Schema ID (4 bytes, big-endian) → the ID of the schema stored in Confluent 
Schema Registry.
 *   Optional data for Protobuf/JSON (like field indexes).
 *   The actual serialized Protobuf message.

>From confluent I found this on maven repo: 
>https://mvnrepository.com/artifact/io.confluent.flink/flink-sql-connector-kafka
but I am not sure if it works with self-managed flink and if so, how should I 
write the query for this connector? Anyone has idea how to deserilize confluent 
kafka proto messages with flink SQL API?

 BRs,

Javad Saljooghi



Javad Saljooghi
Cloud Backend Engineer



Reply via email to