Hi
    SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
watermark[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html

Best,
Congxian


鱼子酱 <384939...@qq.com> 于2020年7月28日周二 下午2:45写道:

> Hi,社区的各位大家好:
> 我目前生产上面使用的是1.8.2版本,相对稳定
> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>
> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> 状态后端使用的是rocksdb 的增量模式
> StateBackend backend =new
> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> 设置了官网文档中找到的删除策略:
>         TableConfig tableConfig = streamTableEnvironment.getConfig();
>         tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> Time.minutes(7));
>
> 请问是我使用的方式不对吗?
>
> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>
>
>
> 版本影响:flink1.10.1 flink1.11.1
> planner:blink planner
> source : kafka source
> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>
>
>
> sql:
> insert into  result
>     select request_time ,request_id ,request_cnt ,avg_resptime
> ,stddev_resptime ,terminal_cnt
> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from
>     (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>         ,commandId as request_id
>         ,count(*) as request_cnt
>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
>         from log
>         where
>         commandId in (104005 ,204005 ,404005)
>         and errCode=0 and attr=0
>         group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>
>         union all
>
>         select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>         ,99999999
>         ,count(*) as request_cnt
>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
>         from log
>         where
>         commandId in (104005 ,204005 ,404005)
>         and errCode=0 and attr=0
>         group by TUMBLE(times, INTERVAL '1' MINUTE)
>     )
>
>
> source:
>
>     create table log (
>       eventTime bigint
>       ,times timestamp(3)
>           ……………………
>       ,commandId integer
>       ,watermark for times as times - interval '5' second
>     )
>     with(
>      'connector' = 'kafka-0.10',
>      'topic' = '……',
>      'properties.bootstrap.servers' = '……',
>      'properties.group.id' = '……',
>      'scan.startup.mode' = 'latest-offset',
>      'format' = 'json'
>     )
>
> sink1:
> create table result (
>       request_time varchar
>       ,request_id integer
>       ,request_cnt bigint
>       ,avg_resptime double
>       ,stddev_resptime double
>       ,insert_time varchar
>     ) with (
>       'connector' = 'kafka-0.10',
>       'topic' = '……',
>       'properties.bootstrap.servers' = '……',
>       'properties.group.id' = '……',
>       'scan.startup.mode' = 'latest-offset',
>       'format' = 'json'
>     )
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复