Hi,
我看你开了minibatch,你用了aggregate算子了吗?

> -----原始邮件-----
> 发件人: op <[email protected]>
> 发送时间: 2020-08-10 10:50:08 (星期一)
> 收件人: user-zh <[email protected]>
> 抄送: 
> 主题: 回复: flink sql状态清理问题
> 
> 配置了minIdleStateRetentionTime ,
> val tConfig = tableEnv.getConfig
> tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10)) 
> 使用的是1.11.0版本,sql就是一个简单测试,按照sessionid groupby count(*),一个sessionid一般1分钟内就会失效,
> 问题是同一套代码,目前观察到的是配置minibatch后影响状态清理了
> 
> 
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                          
>                                               "user-zh"                       
>                                                              
> <[email protected]&gt;;
> 发送时间:&nbsp;2020年8月10日(星期一) 上午10:44
> 收件人:&nbsp;"user-zh"<[email protected]&gt;;
> 
> 主题:&nbsp;Re: flink sql状态清理问题
> 
> 
> 
> 配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?
> 
> Benchao Li <[email protected]&gt; 于2020年8月10日周一 上午10:36写道:
> 
> &gt; Hi,
> &gt;
> &gt; 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
> &gt;
> &gt; op <[email protected]&gt; 于2020年8月10日周一 上午10:27写道:
> &gt;
> &gt; &gt; Hi
> &gt; &gt; &amp;nbsp; &amp;nbsp; 在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
> &gt; &gt; &amp;nbsp; val config = tableConfig.getConfiguration()
> &gt; &gt; &amp;nbsp; 
> &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.enabled",
> &gt; &gt; "true")
> &gt; &gt; &amp;nbsp;
> &gt; 
> &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.allow-latency",
> &gt; &gt; "5s")
> &gt; &gt; &amp;nbsp; 
> &amp;nbsp;&amp;nbsp;config.setString("table.exec.mini-batch.size", "20")
> &gt; &gt;
> &gt; 
> 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> &gt; &gt; 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
> &gt;
> &gt;
> &gt;
> &gt; --
> &gt;
> &gt; Best,
> &gt; Benchao Li
> &gt;


------------------------------
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281

回复