你好:

我现在想在 execution environment 里面设置微批和stateValue的过期时间该怎么设?

这样 conf.setString("exec.state.ttl","15 s");
或者这样  conf.setString("stream.exec.state.ttl","15 s");











在 2021-08-26 19:05:07,"Caizhi Weng" <[email protected]> 写道:
>Hi!
>
>table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
>里无效。
>
>李航飞 <[email protected]> 于2021年8月26日周四 下午7:02写道:
>
>> Configuration conf = new Configuration();
>> conf.setString("table.exec.mini-batch.enabled","true");
>> conf.setString("table.exec.mini-batch.allow-latency","15s");
>> conf.setString("table.exec.mini-batch.size","50");
>> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
>> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> execEnv.configure(conf,this.getClass().getClassLoader());
>> EnvironmentSetting setting = ...
>> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
>> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
>> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
>> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
>> 程序是通过StatementSet .execute()执行的

回复