您好:

我按照您说的试了看了一下watermark,
发现可以 正常更新,相关的计算结果也没发现问题。
1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
<http://apache-flink.147419.n8.nabble.com/file/t793/111.png> 
2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
<http://apache-flink.147419.n8.nabble.com/file/t793/333.png> 
<http://apache-flink.147419.n8.nabble.com/file/t793/222.png> 



Congxian Qiu wrote
> Hi
>     SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
> watermark[1]
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> 
> Best,
> Congxian
> 
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月28日周二 下午2:45写道:
> 
>> Hi,社区的各位大家好:
>> 我目前生产上面使用的是1.8.2版本,相对稳定
>> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>>
>> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> 状态后端使用的是rocksdb 的增量模式
>> StateBackend backend =new
>> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> 设置了官网文档中找到的删除策略:
>>         TableConfig tableConfig = streamTableEnvironment.getConfig();
>>         tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> Time.minutes(7));
>>
>> 请问是我使用的方式不对吗?
>>
>> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>>
>>
>>
>> 版本影响:flink1.10.1 flink1.11.1
>> planner:blink planner
>> source : kafka source
>> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>>
>>
>>
>> sql:
>> insert into  result
>>     select request_time ,request_id ,request_cnt ,avg_resptime
>> ,stddev_resptime ,terminal_cnt
>> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)
>> from
>>     (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>>         ,commandId as request_id
>>         ,count(*) as request_cnt
>>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> stddev_resptime
>>         from log
>>         where
>>         commandId in (104005 ,204005 ,404005)
>>         and errCode=0 and attr=0
>>         group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>>
>>         union all
>>
>>         select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>>         ,99999999
>>         ,count(*) as request_cnt
>>         ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>>         ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> stddev_resptime
>>         from log
>>         where
>>         commandId in (104005 ,204005 ,404005)
>>         and errCode=0 and attr=0
>>         group by TUMBLE(times, INTERVAL '1' MINUTE)
>>     )
>>
>>
>> source:
>>
>>     create table log (
>>       eventTime bigint
>>       ,times timestamp(3)
>>           ……………………
>>       ,commandId integer
>>       ,watermark for times as times - interval '5' second
>>     )
>>     with(
>>      'connector' = 'kafka-0.10',
>>      'topic' = '……',
>>      'properties.bootstrap.servers' = '……',
>>      'properties.group.id' = '……',
>>      'scan.startup.mode' = 'latest-offset',
>>      'format' = 'json'
>>     )
>>
>> sink1:
>> create table result (
>>       request_time varchar
>>       ,request_id integer
>>       ,request_cnt bigint
>>       ,avg_resptime double
>>       ,stddev_resptime double
>>       ,insert_time varchar
>>     ) with (
>>       'connector' = 'kafka-0.10',
>>       'topic' = '……',
>>       'properties.bootstrap.servers' = '……',
>>       'properties.group.id' = '……',
>>       'scan.startup.mode' = 'latest-offset',
>>       'format' = 'json'
>>     )
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复