[ https://issues.apache.org/jira/browse/FLINK-35173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Qingsheng Ren reassigned FLINK-35173: ------------------------------------- Assignee: ZhengYu Chen > Debezium for Mysql connector Custom Time Serializer > ---------------------------------------------------- > > Key: FLINK-35173 > URL: https://issues.apache.org/jira/browse/FLINK-35173 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Affects Versions: 3.1.0 > Reporter: ZhengYu Chen > Assignee: ZhengYu Chen > Priority: Major > Labels: CDC, pull-request-available > Fix For: 3.1.0 > > > Currently, Flink CDC Time encounters time type errors (including DateTime, > Time, Date, TimeStamp) when using MySQL Connector > (JsonDebeziumDeserializationSchema) as deserialization, and the converted > time is wrong. The essential reason is that the timestamp returned by the > bottom layer of debezium is UTC (such as io.debezium.time.Timestamp). The > community has already had some > [PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4], > but they are not work. > Now a way is provided to provide a solution based on Debezium's custom > Convert interface > (https://debezium.io/documentation/reference/1.9/development/converters.html), > Users can choose to convert the above four time types into STRING according > to the specified time format to ensure that users can correctly convert JSON > when using the Flink DataStream API. > When the user enables this converter, we need to configure it according to > the parameters, That's some datastream use case: > {code:java} > Properties debeziumProperties = new Properties(); > debeziumProperties.setProperty("converters", "datetime"); > debeziumProperties.setProperty("datetime.database.type", > DataBaseType.MYSQL.getType()); > debeziumProperties.setProperty("datetime.type", > "cn.xxx.sources.cdc.MysqlDebeziumConverter"); > debeziumProperties.setProperty("datetime.format.date", "yyyy-MM-dd"); > debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss"); > debeziumProperties.setProperty("datetime.format.datetime", "yyyy-MM-dd > HH:mm:ss"); > debeziumProperties.setProperty("datetime.format.timestamp", "yyyy-MM-dd > HH:mm:ss"); > debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8"); > MySqlSourceBuilder<String> builder = MySqlSource.<String>builder() > .hostname(url[0]) > .port(Integer.parseInt(url[1])) > .databaseList(table.getDatabase()) > .tableList(getTablePattern(table)) > .username(table.getUserName()) > .password(table.getPassword()) > .debeziumProperties(debeziumProperties); {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)