感谢! flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时, 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。 StateBackend backend =new RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
StateBackend backend =new FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false); 这样看,有可能是RocksDBStateBackend*增量模式*这边可能存在一些问题。 下面两种都能成功清理 RocksDBStateBackend: <http://apache-flink.147419.n8.nabble.com/file/t793/444.png> FsStateBackend: <http://apache-flink.147419.n8.nabble.com/file/t793/555.png> Benchao Li-2 wrote > 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。 > 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下, > state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。 > > 鱼子酱 < > 384939718@ >> 于2020年7月29日周三 上午9:47写道: > >> 您好: >> >> 我按照您说的试了看了一下watermark, >> 发现可以 正常更新,相关的计算结果也没发现问题。 >> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了 >> <http://apache-flink.147419.n8.nabble.com/file/t793/111.png> >> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。 >> <http://apache-flink.147419.n8.nabble.com/file/t793/333.png> >> <http://apache-flink.147419.n8.nabble.com/file/t793/222.png> >> >> >> >> Congxian Qiu wrote >> > Hi >> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state >> 越来越大的情况,或许可以检查下 >> > watermark[1] >> > >> > [1] >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html >> > >> > Best, >> > Congxian >> > >> > >> > 鱼子酱 < >> >> > 384939718@ >> >> >> 于2020年7月28日周二 下午2:45写道: >> > >> >> Hi,社区的各位大家好: >> >> 我目前生产上面使用的是1.8.2版本,相对稳定 >> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究, >> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本 >> >> >> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加 >> >> 状态后端使用的是rocksdb 的增量模式 >> >> StateBackend backend =new >> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true); >> >> 设置了官网文档中找到的删除策略: >> >> TableConfig tableConfig = streamTableEnvironment.getConfig(); >> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2), >> >> Time.minutes(7)); >> >> >> >> 请问是我使用的方式不对吗? >> >> >> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator >> >> >> >> >> >> >> >> 版本影响:flink1.10.1 flink1.11.1 >> >> planner:blink planner >> >> source : kafka source >> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> >> >> >> >> >> >> >> >> >> >> sql: >> >> insert into result >> >> select request_time ,request_id ,request_cnt ,avg_resptime >> >> ,stddev_resptime ,terminal_cnt >> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) >> >> from >> >> ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' >> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time >> >> ,commandId as request_id >> >> ,count(*) as request_cnt >> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime >> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as >> >> stddev_resptime >> >> from log >> >> where >> >> commandId in (104005 ,204005 ,404005) >> >> and errCode=0 and attr=0 >> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId >> >> >> >> union all >> >> >> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' >> >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time >> >> ,99999999 >> >> ,count(*) as request_cnt >> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime >> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as >> >> stddev_resptime >> >> from log >> >> where >> >> commandId in (104005 ,204005 ,404005) >> >> and errCode=0 and attr=0 >> >> group by TUMBLE(times, INTERVAL '1' MINUTE) >> >> ) >> >> >> >> >> >> source: >> >> >> >> create table log ( >> >> eventTime bigint >> >> ,times timestamp(3) >> >> …………………… >> >> ,commandId integer >> >> ,watermark for times as times - interval '5' second >> >> ) >> >> with( >> >> 'connector' = 'kafka-0.10', >> >> 'topic' = '……', >> >> 'properties.bootstrap.servers' = '……', >> >> 'properties.group.id' = '……', >> >> 'scan.startup.mode' = 'latest-offset', >> >> 'format' = 'json' >> >> ) >> >> >> >> sink1: >> >> create table result ( >> >> request_time varchar >> >> ,request_id integer >> >> ,request_cnt bigint >> >> ,avg_resptime double >> >> ,stddev_resptime double >> >> ,insert_time varchar >> >> ) with ( >> >> 'connector' = 'kafka-0.10', >> >> 'topic' = '……', >> >> 'properties.bootstrap.servers' = '……', >> >> 'properties.group.id' = '……', >> >> 'scan.startup.mode' = 'latest-offset', >> >> 'format' = 'json' >> >> ) >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> >> >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> > > > -- > > Best, > Benchao Li -- Sent from: http://apache-flink.147419.n8.nabble.com/