感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

        StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

这样看,有可能是RocksDBStateBackend*增量模式*这边可能存在一些问题。

下面两种都能成功清理
RocksDBStateBackend:
<http://apache-flink.147419.n8.nabble.com/file/t793/444.png> 

FsStateBackend:
<http://apache-flink.147419.n8.nabble.com/file/t793/555.png> 


Benchao Li-2 wrote
> 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
> 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
> state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月29日周三 上午9:47写道:
> 
>> 您好:
>>
>> 我按照您说的试了看了一下watermark,
>> 发现可以 正常更新,相关的计算结果也没发现问题。
>> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
>> &lt;http://apache-flink.147419.n8.nabble.com/file/t793/111.png&gt;
>> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
>> &lt;http://apache-flink.147419.n8.nabble.com/file/t793/333.png&gt;
>> &lt;http://apache-flink.147419.n8.nabble.com/file/t793/222.png&gt;
>>
>>
>>
>> Congxian Qiu wrote
>> > Hi
>> >     SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
>> 越来越大的情况,或许可以检查下
>> > watermark[1]
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > 鱼子酱 <
>>
>> > 384939718@
>>
>> >> 于2020年7月28日周二 下午2:45写道:
>> >
>> >> Hi,社区的各位大家好:
>> >> 我目前生产上面使用的是1.8.2版本,相对稳定
>> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>> >>
>> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> >> 状态后端使用的是rocksdb 的增量模式
>> >> StateBackend backend =new
>> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> >> 设置了官网文档中找到的删除策略:
>> >>         TableConfig tableConfig = streamTableEnvironment.getConfig();
>> >>         tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> >> Time.minutes(7));
>> >>
>> >> 请问是我使用的方式不对吗?
>> >>
>> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>> >>
>> >>
>> >>
>> >> 版本影响:flink1.10.1 flink1.11.1
>> >> planner:blink planner
>> >> source : kafka source
>> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> sql:
>> >> insert into  result
>> >>     select request_time ,request_id ,request_cnt ,avg_resptime
>> >> ,stddev_resptime ,terminal_cnt
>> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)
>> >> from
>> >>     (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >>         ,commandId as request_id
>> >>         ,count(*) as request_cnt
>> >>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >>         from log
>> >>         where
>> >>         commandId in (104005 ,204005 ,404005)
>> >>         and errCode=0 and attr=0
>> >>         group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>> >>
>> >>         union all
>> >>
>> >>         select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >>         ,99999999
>> >>         ,count(*) as request_cnt
>> >>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >>         from log
>> >>         where
>> >>         commandId in (104005 ,204005 ,404005)
>> >>         and errCode=0 and attr=0
>> >>         group by TUMBLE(times, INTERVAL '1' MINUTE)
>> >>     )
>> >>
>> >>
>> >> source:
>> >>
>> >>     create table log (
>> >>       eventTime bigint
>> >>       ,times timestamp(3)
>> >>           ……………………
>> >>       ,commandId integer
>> >>       ,watermark for times as times - interval '5' second
>> >>     )
>> >>     with(
>> >>      'connector' = 'kafka-0.10',
>> >>      'topic' = '……',
>> >>      'properties.bootstrap.servers' = '……',
>> >>      'properties.group.id' = '……',
>> >>      'scan.startup.mode' = 'latest-offset',
>> >>      'format' = 'json'
>> >>     )
>> >>
>> >> sink1:
>> >> create table result (
>> >>       request_time varchar
>> >>       ,request_id integer
>> >>       ,request_cnt bigint
>> >>       ,avg_resptime double
>> >>       ,stddev_resptime double
>> >>       ,insert_time varchar
>> >>     ) with (
>> >>       'connector' = 'kafka-0.10',
>> >>       'topic' = '……',
>> >>       'properties.bootstrap.servers' = '……',
>> >>       'properties.group.id' = '……',
>> >>       'scan.startup.mode' = 'latest-offset',
>> >>       'format' = 'json'
>> >>     )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
> 
> 
> -- 
> 
> Best,
> Benchao Li





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复