sourcr:mysql-cdc
sink:elasticsearch
问题描述:
从mysql中同步表数据至elasticsearch后,进行新增再删除的某条数据出现问题,导致sink失败(没加primary
key)。checkpoint失败,程序自动恢复重启后,checkpoint 成功,但是elasticsearch 中的数据是mysql
表中的两倍,出现重复同步情况。
程序的自动恢复不应该是从当前checkpoint 中记录的binlog 位置再同步么?为什么会再重头同步一次呢?
(ddl 中写死了server-id,
"
在官网文档中看到在代码中对于开启checkpoint配置,但是sql client 的相关文档没有checkpoint的描述,是不支持么?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢,已经解决了!
BR,
lingchanhu
--
Sent from: http://apache-flink.147419.n8.nabble.com/
非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)
org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
at
*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)
org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
at