从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1], 这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性. 另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了.
一些可能的建议如下 1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer 节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op), 这会导致非确定性更新[3]. 2. 检查写入 MySQL 数据库中的物理表 PK 字段是否和 Flink SQL sink 表的 PK 字段保持一致. [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java [2] https://github.com/apache/flink/blob/3ea83baad0c8413f8e1f4a027866335d13789538/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L378 [3] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/#31-%e6%b5%81%e4%b8%8a%e7%9a%84%e4%b8%8d%e7%a1%ae%e5%ae%9a%e6%80%a7 Best, Jane On Mon, Mar 6, 2023 at 11:24 AM 陈佳豪 <[email protected]> wrote: > 刚做了一下测试 > 目前假定有3行数据需要同步(全量): > | 编号 | > 电话 > | > 座机 > | > | 1 | > 13113133333 > | > 123 > | > | 2 | > 13113133333 > | > 456 > | > | 3 | > 13113133333 > | > 789 > | > > > > > 这个时候我修改第四行数据的两个字段(增量): > | 1 > > > | > 电话 > | > 座机 > | > | 1 | > 13113133333 > | > 123 > | > | 2 | > 13113133333 > | > 456 > | > | 3 | > 13113133110 > | > 888 > | > 修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确). > 然后我继续删除数据3这个时候就不对了,在flink里面有修改两次的缓存数据,所以删除的同时将原来的旧数据插入进了mysql中(操作错误). > > 上述是我基于flink1.16.1版本进行测试的结果,目前不知道是不是要配置flink还是下游算子具体配置什么也不是清楚。这个问题困扰有3周了,各种测试调整都没有起作用。 > > > > > > > > > 在 2023-03-06 10:54:23,"陈佳豪" <[email protected]> 写道: > >hi 早上好 > > >我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下 > > > >== Abstract Syntax Tree == > >LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, > 名称, 手机, 座机]) > >+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], > 名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > 座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"]) > > +- LogicalTableScan(table=[[default_catalog, default_database, 电话]]) > > > > > >== Optimized Physical Plan == > >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, > 手机, 座机], upsertMaterialize=[true]) > >+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS > rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS > VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机, > CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS > 座机]) > > +- TableSourceScan(table=[[default_catalog, default_database, 电话]], > fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, > 63fd660536521f81a2cfabae]) > > > > > >== Optimized Execution Plan == > >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, > 手机, 座机], upsertMaterialize=[true]) > >+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID, > 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS > VARCHAR(2147483647)) AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255)) > AS 座机]) > > +- TableSourceScan(table=[[default_catalog, default_database, 电话]], > fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, > 63fd660536521f81a2cfabae]) > > > > > > > >在 2023-03-05 15:37:53,"Jane Chan" <[email protected]> 写道: > >>Hi, > >> > >>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 > 1.16.1 > >>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan > >>打印出来看看. > >> > >>[1] > >> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ > >> > >>祝好! > >>Jane > >> > >>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 <[email protected]> wrote: > >> > >>> hi 你好 > >>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的 > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> 在 2023-03-02 11:52:41,"Jane Chan" <[email protected]> 写道: > >>> >Hi, > >>> > > >>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, > 这个 > >>> >query 在 1.16.2 上验证没有问题 > >>> > > >>> >[1] > >>> > > >>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ > >>> > > >>> >Best, > >>> >Jane > >>> > > >>> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <[email protected]> wrote: > >>> > > >>> >> flink ,kafka连接 jdbc连接版本都是1.15.2的 > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> 在 2023-03-01 18:14:35,"陈佳豪" <[email protected]> 写道: > >>> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 > >>> >> >String kafka = "CREATE TABLE `电话` ( `rowid` > >>> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` > >>> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad` > >>> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( > >>> >> 'connector' = 'kafka', 'topic' = > >>> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', > >>> >> 'properties.bootstrap.servers' = '132.232.27.116:9092', > >>> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' > )"; > >>> >> > > >>> >> >String mysql = "CREATE TABLE `电话_1` ( `rowID` VARCHAR(255),`名称` > >>> >> STRING,`手机` STRING,`座机` VARCHAR(255), PRIMARY KEY (`rowID`) NOT > >>> >> ENFORCED ) WITH ( 'connector' = 'jdbc', 'driver' = > >>> >> 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql:// > >>> >> 43.136.128.102:6506/meihua_test', 'username' = 'root', > >>> 'password' = > >>> >> '123456', 'table-name' = '电话2' )"; > >>> >> > > >>> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as > >>> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` > from ( > >>> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as > >>> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` > as > >>> `座机` > >>> >> from `电话` ) as t_1"; > >>> >> > > >>> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug? > >>> >> > >>> >
