*Hello Flink Team,* I am right now using Flink stateful function in my project, which are consuming avro serialized events(both key and value are serialized) from kafka, but it seems there is no configuration that users can customize for deserializing the kafka record's key, because I noticed that the key deserializer is fixed to be a UTF-8 String Deserializer here: RoutableKafkaIngressDeserializer.java <https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java> .
As a result, the deserialized key becomes chaos code, then incorrect hash values will be generated based on these chaos codes, which leads to highly possibly uneven record distribution and is prone to cause data skew. I wonder if the community will consider adding a configuration for users to customize the deserializer in the Flink stateful function kafka ingress ? Looking forward to hearing from you Best regards *Xin Li*