刚做了一下测试
目前假定有3行数据需要同步(全量):
| 编号 |
电话
|
座机
|
| 1 |
13113133333
|
123
|
| 2 |
13113133333
|
456
|
| 3 |
13113133333
|
789
|




这个时候我修改第四行数据的两个字段(增量):
| 1


|
电话
|
座机
|
| 1 |
13113133333
|
123
|
| 2 |
13113133333
|
456
|
| 3 |
13113133110
|
888
|
修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确).
然后我继续删除数据3这个时候就不对了,在flink里面有修改两次的缓存数据,所以删除的同时将原来的旧数据插入进了mysql中(操作错误).
上述是我基于flink1.16.1版本进行测试的结果,目前不知道是不是要配置flink还是下游算子具体配置什么也不是清楚。这个问题困扰有3周了,各种测试调整都没有起作用。








在 2023-03-06 10:54:23,"陈佳豪" <jagec...@yeah.net> 写道:
>hi 早上好
>我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下
>
>== Abstract Syntax Tree ==
>LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 
>手机, 座机])
>+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], 
>名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
>座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"])
>   +- LogicalTableScan(table=[[default_catalog, default_database, 电话]])
>
>
>== Optimized Physical Plan ==
>Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 
>座机], upsertMaterialize=[true])
>+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS rowID, 
>63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS 
>VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机, 
>CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS 座机])
>   +- TableSourceScan(table=[[default_catalog, default_database, 电话]], 
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 
> 63fd660536521f81a2cfabae])
>
>
>== Optimized Execution Plan ==
>Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 
>座机], upsertMaterialize=[true])
>+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID, 63fd65fb36521f81a2cfab90 
>AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647)) AS 手机, 
>CAST(63fd660536521f81a2cfabae AS VARCHAR(255)) AS 座机])
>   +- TableSourceScan(table=[[default_catalog, default_database, 电话]], 
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 
> 63fd660536521f81a2cfabae])
>
>
>
>在 2023-03-05 15:37:53,"Jane Chan" <qingyue....@gmail.com> 写道:
>>Hi,
>>
>>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1
>>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
>>打印出来看看.
>>
>>[1]
>>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>>
>>祝好!
>>Jane
>>
>>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 <jagec...@yeah.net> wrote:
>>
>>> hi 你好
>>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2023-03-02 11:52:41,"Jane Chan" <qingyue....@gmail.com> 写道:
>>> >Hi,
>>> >
>>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
>>> >query 在 1.16.2 上验证没有问题
>>> >
>>> >[1]
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>>> >
>>> >Best,
>>> >Jane
>>> >
>>> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <jagec...@yeah.net> wrote:
>>> >
>>> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> 在 2023-03-01 18:14:35,"陈佳豪" <jagec...@yeah.net> 写道:
>>> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
>>> >> >String kafka = "CREATE TABLE `电话` (    `rowid`
>>> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
>>> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
>>> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
>>> >> 'connector' = 'kafka', 'topic' =
>>> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
>>> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
>>> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
>>> >> >
>>> >> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
>>> >> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
>>> >> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
>>> >> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
>>> >> 43.136.128.102:6506/meihua_test',    'username' = 'root',
>>> 'password' =
>>> >> '123456',    'table-name' = '电话2'  )";
>>> >> >
>>> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
>>> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
>>> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
>>> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as
>>> `座机`
>>> >> from `电话` ) as t_1";
>>> >> >
>>> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>>> >>
>>>

回复