[ 
https://issues.apache.org/jira/browse/FLINK-35173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-35173:
-------------------------------------

    Assignee: ZhengYu Chen

> Debezium for Mysql connector Custom Time Serializer 
> ----------------------------------------------------
>
>                 Key: FLINK-35173
>                 URL: https://issues.apache.org/jira/browse/FLINK-35173
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: 3.1.0
>            Reporter: ZhengYu Chen
>            Assignee: ZhengYu Chen
>            Priority: Major
>              Labels: CDC, pull-request-available
>             Fix For: 3.1.0
>
>
> Currently, Flink CDC Time encounters time type errors (including DateTime, 
> Time, Date, TimeStamp) when using MySQL Connector 
> (JsonDebeziumDeserializationSchema) as deserialization, and the converted 
> time is wrong. The essential reason is that the timestamp returned by the 
> bottom layer of debezium is UTC (such as io.debezium.time.Timestamp). The 
> community has already had some 
> [PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4],
>  but they are not work.
> Now a way is provided to provide a solution based on Debezium's custom 
> Convert interface 
> (https://debezium.io/documentation/reference/1.9/development/converters.html),
> Users can choose to convert the above four time types into STRING according 
> to the specified time format to ensure that users can correctly convert JSON 
> when using the Flink DataStream API.
> When the user enables this converter, we need to configure it according to 
> the parameters, That's some datastream use case:
> {code:java}
> Properties debeziumProperties = new Properties();
> debeziumProperties.setProperty("converters", "datetime");
> debeziumProperties.setProperty("datetime.database.type", 
> DataBaseType.MYSQL.getType());
> debeziumProperties.setProperty("datetime.type", 
> "cn.xxx.sources.cdc.MysqlDebeziumConverter");
> debeziumProperties.setProperty("datetime.format.date", "yyyy-MM-dd");
> debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.datetime", "yyyy-MM-dd 
> HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.timestamp", "yyyy-MM-dd 
> HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");
> MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
>         .hostname(url[0])
>         .port(Integer.parseInt(url[1]))
>         .databaseList(table.getDatabase())
>         .tableList(getTablePattern(table))
>         .username(table.getUserName())
>         .password(table.getPassword())
>         .debeziumProperties(debeziumProperties); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to