Re: flink sql状态清理问题

2020-08-09 文章 godfrey he
配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?

Benchao Li  于2020年8月10日周一 上午10:36写道:

> Hi,
>
> 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
>
> op <520075...@qq.com> 于2020年8月10日周一 上午10:27写道:
>
> > Hi
> >   在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
> >  val config = tableConfig.getConfiguration()
> >  config.setString("table.exec.mini-batch.enabled",
> > "true")
> > 
> config.setString("table.exec.mini-batch.allow-latency",
> > "5s")
> >  config.setString("table.exec.mini-batch.size", "20")
> >
> 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> > 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink sql状态清理问题

2020-08-09 文章 Benchao Li
Hi,

最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。

op <520075...@qq.com> 于2020年8月10日周一 上午10:27写道:

> Hi
>   在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
>  val config = tableConfig.getConfiguration()
>  config.setString("table.exec.mini-batch.enabled",
> "true")
>  config.setString("table.exec.mini-batch.allow-latency",
> "5s")
>  config.setString("table.exec.mini-batch.size", "20")
> 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢



-- 

Best,
Benchao Li


Re: Flink sql 状态清理问题

2020-06-10 文章 Benchao Li
Hi,

Join算子的state是支持清理的。
可以提供下以下信息:
- Flink 版本
- planner (blink planner / old planner)

op <520075...@qq.com> 于2020年6月10日周三 下午4:08写道:

> hi,
> 写了个测试程序:
>
> ..
>
> val tConfig = bstEnv.getConfig
>
> confg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))
>
> ..
>
> val q1=bstEnv.sqlQuery(
>   """select createTime,feedid from source
> |where circleName is not null
> |and circleName not in('','_')
> |and action = 'C_FEED_EDIT_SEND'
> |""".stripMargin)
>
>
>  bstEnv.createTemporaryView("sourcefeed",q1)
> val q2=bstEnv.sqlQuery(
>   """select feedid,postfeedid,action from source
> |where circleName is not null
> |and circleName not in('','_')
> |and action in('C_PUBLISH','C_FORWARD_PUBLISH')
> |""".stripMargin)
>
> bstEnv.createTemporaryView("postfeed",q2)
> bstEnv.sqlQuery(
>   """
> |select count(b.postfeedid) from
> |sourcefeed a
> |join postfeed b
> |on a.feedid=b.postfeedid
>   """.stripMargin).toRetractStream[Row](confg).print("")
>
>
> //
>
> 程序里面设置了状态失效最晚时间是空闲25分钟,但是运行了几天了,我再web上观察到的状态一直再不断增加,可以确定关联的id最多只会活跃1个小时左右,请问是哪里没设置对还是join两边的state不支持清理?
>
>