Re: flink sql状态清理问题
配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗? Benchao Li 于2020年8月10日周一 上午10:36写道: > Hi, > > 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。 > > op <520075...@qq.com> 于2020年8月10日周一 上午10:27写道: > > > Hi > > 在使用flink sql的过程中遇到如下情况,在配置了如下选项后: > > val config = tableConfig.getConfiguration() > > config.setString("table.exec.mini-batch.enabled", > > "true") > > > config.setString("table.exec.mini-batch.allow-latency", > > "5s") > > config.setString("table.exec.mini-batch.size", "20") > > > 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长; > > 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢 > > > > -- > > Best, > Benchao Li >
Re: flink sql状态清理问题
Hi, 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。 op <520075...@qq.com> 于2020年8月10日周一 上午10:27写道: > Hi > 在使用flink sql的过程中遇到如下情况,在配置了如下选项后: > val config = tableConfig.getConfiguration() > config.setString("table.exec.mini-batch.enabled", > "true") > config.setString("table.exec.mini-batch.allow-latency", > "5s") > config.setString("table.exec.mini-batch.size", "20") > 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长; > 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢 -- Best, Benchao Li
Re: Flink sql 状态清理问题
Hi, Join算子的state是支持清理的。 可以提供下以下信息: - Flink 版本 - planner (blink planner / old planner) op <520075...@qq.com> 于2020年6月10日周三 下午4:08写道: > hi, > 写了个测试程序: > > .. > > val tConfig = bstEnv.getConfig > > confg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25)) > > .. > > val q1=bstEnv.sqlQuery( > """select createTime,feedid from source > |where circleName is not null > |and circleName not in('','_') > |and action = 'C_FEED_EDIT_SEND' > |""".stripMargin) > > > bstEnv.createTemporaryView("sourcefeed",q1) > val q2=bstEnv.sqlQuery( > """select feedid,postfeedid,action from source > |where circleName is not null > |and circleName not in('','_') > |and action in('C_PUBLISH','C_FORWARD_PUBLISH') > |""".stripMargin) > > bstEnv.createTemporaryView("postfeed",q2) > bstEnv.sqlQuery( > """ > |select count(b.postfeedid) from > |sourcefeed a > |join postfeed b > |on a.feedid=b.postfeedid > """.stripMargin).toRetractStream[Row](confg).print("") > > > // > > 程序里面设置了状态失效最晚时间是空闲25分钟,但是运行了几天了,我再web上观察到的状态一直再不断增加,可以确定关联的id最多只会活跃1个小时左右,请问是哪里没设置对还是join两边的state不支持清理? > >