Hi, I deployed Flink on Kubernetes (running on AWS EC2) using the Flink K8s operator v1.2.0 and a custom Dockerfile based on the image flink:2.0.0-java21. My messages are in Kafka (Confluent Cloud), serialized with Protobuf and registered in the Confluent Schema Registry. I was able to connect to the Kafka cluster and consume deserialized messages successfully using the SQL client. However, I couldn’t find a connector that works with the Confluent Schema Registry to deserialize Protobuf messages when using the Flink SQL API. Question: Is there a Flink SQL connector that supports deserializing Protobuf messages from Confluent Kafka with Schema Registry integration? If not, what’s the recommended approach to achieve this? This proto format: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/ does not work with confluent Wire Format i.e.:
* Magic byte (1 byte, always 0) → identifies Confluent’s wire format. * Schema ID (4 bytes, big-endian) → the ID of the schema stored in Confluent Schema Registry. * Optional data for Protobuf/JSON (like field indexes). * The actual serialized Protobuf message. >From confluent I found this on maven repo: >https://mvnrepository.com/artifact/io.confluent.flink/flink-sql-connector-kafka but I am not sure if it works with self-managed flink and if so, how should I write the query for this connector? Anyone has idea how to deserilize confluent kafka proto messages with flink SQL API? BRs, Javad Saljooghi Javad Saljooghi Cloud Backend Engineer
