Re: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

2020-07-01 文章
我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221 夏帅 于2020年7月1日周三 下午3:13写道: > 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once > > Kafka011TableSink > > > @Override > protected SinkFunction createKafkaProducer( > String topic, >

Re: kafka相关问题

2020-06-10 文章
那你有没有尝试过修改connector中property中connector.startup-mode 设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。 另外,我想问一下 你的sql是一直运行的吗? 我给的limit方案是一个upersert流。 小学生 <201782...@qq.com> 于2020年6月10日周三 下午5:31写道: > limit 没有用呀。有没有切实可行的方案呢,pyflink下。

Re: kafka相关问题

2020-06-10 文章
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整. 如有错误欢迎指正 小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道: > 您好,我是通过select * from > table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from > table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)

Re: kafka相关问题

2020-06-10 文章
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗? 我个人猜可能有两种方案: 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始; 2.定期向文件系统写入数据。 小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道: > 各位大佬好,请教一个问题: >

Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 文章
的文档中Assiging Operator IDs这一节的内容。 注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。 方盛凯 于2020年6月9日周二 下午9:26写道: > > 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 > 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 > https://ci.apache.

Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 文章
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html 如有错误,欢迎补充回答。 陈赋赟 于2020年6月8日周一 上午11:53写道: > 原先sql任务是: > CREATE TABLE