Hi, 原问题中 String 变量 kafka 和 mysql 赋值反了, 以及能提供下所使用的 flink 版本吗, 我使用 1.16.1 没有复现此问题
payload { "before": { "rowid": "f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d", "63f73b332e77497da91286f0": "Jerry", "63f73b3f2e77497da91286fb": "mobile number", "63f73b3f2e77497da91286fc": "telephone number" }, "after": null, "source": {...}, "op": "d", "ts_ms": 1677342340042, "transaction": null } flink sql Flink SQL> insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `手机` from (select `rowid` as `rowID`, `63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`, `63f73b3f2e77497da91286fc` as `座机` from `电话_1`) as t_1; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 8490c9530d3a97e73aeedfe9745f2fe3 mysql output mysql> select * from 电话; + --------------------------------------+--------+---------------+------------------+ | rowID | 名称 | 手机 | 座机 | + --------------------------------------+--------+---------------+------------------+ | f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Tom | mobile number | telephone number | + --------------------------------------+--------+---------------+------------------+ 1 row in set (0.00 sec) mysql> select * from 电话; + --------------------------------------+--------+---------------+------------------+ | rowID | 名称 | 手机 | 座机 | + --------------------------------------+--------+---------------+------------------+ | f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Jerry | mobile number | telephone number | + --------------------------------------+--------+---------------+------------------+ 1 row in set (0.00 sec) mysql> select * from 电话; Empty set (0.00 sec) Best, Jane On Fri, Feb 24, 2023 at 2:21 PM 陈佳豪 <jagec...@yeah.net> wrote: > -----建表语法如下 > String kafka = "CREATE TABLE `电话` " + > "( `rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机` > VARCHAR(255), " + > " PRIMARY KEY (`rowID`) NOT ENFORCED ) " + > " WITH " + > "( 'connector' = 'jdbc', " + > " 'driver' = 'com.mysql.cj.jdbc.Driver', " + > " 'url' = 'jdbc:mysql://XXXXXX:6506/meihua_test', " + > " 'username' = 'root', " + > " 'password' = '123456', " + > " 'table-name' = '电话' )"; > > String mysql = "CREATE TABLE `电话_1` " + > "( `rowid` VARCHAR(100)," + > "`63f73b332e77497da91286f0` VARCHAR(100)," + > "`63f73b3f2e77497da91286fb` VARCHAR(100)," + > "`63f73b3f2e77497da91286fc` VARCHAR(100)," + > "`op` STRING ," + > " PRIMARY KEY (rowid) NOT ENFORCED )" + > " WITH " + > "( 'connector' = 'kafka', " + > "'topic' = > 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," + > " 'properties.bootstrap.servers' = 'XXXXXX:9092'," + > " 'scan.startup.mode' = 'earliest-offset', " + > "'format' = 'debezium-json' )"; > -----执行语句如下 > String insert = "insert into `电话` select `t_1`.`rowID` as > `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" + > " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as > `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` > from `电话_1` ) as t_1"; > -----操作数据如下 > > > String insert = "insert into `电话` select `t_1`.`rowID` as > `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" + > " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as > `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` > from `电话_1` ) as t_1"; > -----执行语句如下 > { > "op":"d", > "before":{ > "rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d" > }, > "after":null > } > 现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。 > 各位大佬帮看下 谢谢。