我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221
夏帅 于2020年7月1日周三 下午3:13写道:
> 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once
>
> Kafka011TableSink
>
>
> @Override
> protected SinkFunction createKafkaProducer(
> String topic,
>
那你有没有尝试过修改connector中property中connector.startup-mode
设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。
另外,我想问一下 你的sql是一直运行的吗?
我给的limit方案是一个upersert流。
小学生 <201782...@qq.com> 于2020年6月10日周三 下午5:31写道:
> limit 没有用呀。有没有切实可行的方案呢,pyflink下。
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整.
如有错误欢迎指正
小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道:
> 您好,我是通过select * from
> table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from
> table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
我个人猜可能有两种方案:
1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
2.定期向文件系统写入数据。
小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道:
> 各位大佬好,请教一个问题:
>
的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。
方盛凯 于2020年6月9日周二 下午9:26写道:
>
> 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
> 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
> https://ci.apache.
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
如有错误,欢迎补充回答。
陈赋赟 于2020年6月8日周一 上午11:53写道:
> 原先sql任务是:
> CREATE TABLE