I create table as below, and the data is from kafka. I want to deserialize the json message to Pojo object. But the message format is not avro or simple json. So I need to know how to register custormized serializer and use it for the 'format.type' property. By the way, my flink version is 1.10.0. CREATE TABLE MyUserTable( uuid VARCHAR, orgId VARCHAR ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic_name', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'format.type' = 'cutormizeSerializer' ) The kafka message body sample, each columnName is the key for Pojo object, and rawData is value: { "beforeData": [], "byteSize": 272, "columnNumber": 32, "data": [{ "byteSize": 8, "columnName": "APPLY_PERSON_ID", "rawData": 10017, "type": "LONG" }, { "byteSize": 12, "columnName": "UPDATE_SALARY", "rawData": "11000.000000", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "UP_AMOUNT", "rawData": "1000.000000", "type": "DOUBLE" }, { "byteSize": 3, "columnName": "CURRENCY", "rawData": "CNY", "type": "STRING" }, { "byteSize": 32, "columnName": "EXCHANGE_RATE", "rawData": "1.000000000000000000000000000000", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "DEDUCTED_ACCOUNT", "rawData": "1000.000000", "type": "DOUBLE" }, { "byteSize": 1, "columnName": "ENTER_AT_PROCESS", "rawData": "Y", "type": "STRING" }], "dataCount": 0, "dataMetaData": { "connector": "mysql", "pos": 1000368076, "row": 0, "ts_ms": 1625565737000, "snapshot": "false", "db": "testdb", "table": "flow_person_t" }, "key": "APPLY_PERSON_ID", "memorySize": 1120, "operation": "insert", "rowIndex": -1, "timestamp": "1970-01-01 00:00:00" } The Pojo object as below: import lombok.Data;
@Data public class HrSalaryPersonVO { private String uuid; private String orgId; private String unitId; private String effectiveDate; private int adjustPersonCount; private Double adjustAmount; private Double beforeSalaryAmount; private Double adjustRate; private String data0prateType; private String status; }