Hi SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下 watermark[1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html Best, Congxian 鱼子酱 <384939...@qq.com> 于2020年7月28日周二 下午2:45写道: > Hi,社区的各位大家好: > 我目前生产上面使用的是1.8.2版本,相对稳定 > 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究, > 截止目前先后研究了1.10.1 1.11.1共2个大版本 > > 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加 > 状态后端使用的是rocksdb 的增量模式 > StateBackend backend =new > RocksDBStateBackend("hdfs:///checkpoints-data/",true); > 设置了官网文档中找到的删除策略: > TableConfig tableConfig = streamTableEnvironment.getConfig(); > tableConfig.setIdleStateRetentionTime(Time.minutes(2), > Time.minutes(7)); > > 请问是我使用的方式不对吗? > > 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator > > > > 版本影响:flink1.10.1 flink1.11.1 > planner:blink planner > source : kafka source > 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > > > sql: > insert into result > select request_time ,request_id ,request_cnt ,avg_resptime > ,stddev_resptime ,terminal_cnt > ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from > ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' > MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time > ,commandId as request_id > ,count(*) as request_cnt > ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime > ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime > from log > where > commandId in (104005 ,204005 ,404005) > and errCode=0 and attr=0 > group by TUMBLE(times, INTERVAL '1' MINUTE),commandId > > union all > > select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' > MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time > ,99999999 > ,count(*) as request_cnt > ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime > ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime > from log > where > commandId in (104005 ,204005 ,404005) > and errCode=0 and attr=0 > group by TUMBLE(times, INTERVAL '1' MINUTE) > ) > > > source: > > create table log ( > eventTime bigint > ,times timestamp(3) > …………………… > ,commandId integer > ,watermark for times as times - interval '5' second > ) > with( > 'connector' = 'kafka-0.10', > 'topic' = '……', > 'properties.bootstrap.servers' = '……', > 'properties.group.id' = '……', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ) > > sink1: > create table result ( > request_time varchar > ,request_id integer > ,request_cnt bigint > ,avg_resptime double > ,stddev_resptime double > ,insert_time varchar > ) with ( > 'connector' = 'kafka-0.10', > 'topic' = '……', > 'properties.bootstrap.servers' = '……', > 'properties.group.id' = '……', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ) > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >