hi 早上好
我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 
手机, 座机])
+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], 
名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"])
   +- LogicalTableScan(table=[[default_catalog, default_database, 电话]])


== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机], 
upsertMaterialize=[true])
+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS rowID, 
63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机, 
CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS 座机])
   +- TableSourceScan(table=[[default_catalog, default_database, 电话]], 
fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 
63fd660536521f81a2cfabae])


== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机], 
upsertMaterialize=[true])
+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID, 63fd65fb36521f81a2cfab90 
AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647)) AS 手机, 
CAST(63fd660536521f81a2cfabae AS VARCHAR(255)) AS 座机])
   +- TableSourceScan(table=[[default_catalog, default_database, 电话]], 
fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 
63fd660536521f81a2cfabae])



在 2023-03-05 15:37:53,"Jane Chan" <qingyue....@gmail.com> 写道:
>Hi,
>
>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1
>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
>打印出来看看.
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>
>祝好!
>Jane
>
>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 <jagec...@yeah.net> wrote:
>
>> hi 你好
>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-02 11:52:41,"Jane Chan" <qingyue....@gmail.com> 写道:
>> >Hi,
>> >
>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
>> >query 在 1.16.2 上验证没有问题
>> >
>> >[1]
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>> >
>> >Best,
>> >Jane
>> >
>> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <jagec...@yeah.net> wrote:
>> >
>> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-01 18:14:35,"陈佳豪" <jagec...@yeah.net> 写道:
>> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
>> >> >String kafka = "CREATE TABLE `电话` (    `rowid`
>> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
>> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
>> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
>> >> 'connector' = 'kafka', 'topic' =
>> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
>> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
>> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
>> >> >
>> >> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
>> >> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
>> >> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
>> >> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
>> >> 43.136.128.102:6506/meihua_test',    'username' = 'root',
>> 'password' =
>> >> '123456',    'table-name' = '电话2'  )";
>> >> >
>> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
>> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
>> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
>> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as
>> `座机`
>> >> from `电话` ) as t_1";
>> >> >
>> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>> >>
>>

回复