Hi,社区的各位大家好:
我目前生产上面使用的是1.8.2版本,相对稳定
为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
截止目前先后研究了1.10.1 1.11.1共2个大版本

在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
状态后端使用的是rocksdb 的增量模式
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/",true);
设置了官网文档中找到的删除策略:
        TableConfig tableConfig = streamTableEnvironment.getConfig();
        tableConfig.setIdleStateRetentionTime(Time.minutes(2),
Time.minutes(7));      

请问是我使用的方式不对吗?

通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator



版本影响:flink1.10.1 flink1.11.1
planner:blink planner
source : kafka source
时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


                

                
sql:
insert into  result
    select request_time ,request_id ,request_cnt ,avg_resptime
,stddev_resptime ,terminal_cnt
,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from
    (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
        ,commandId as request_id
        ,count(*) as request_cnt
        ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
        ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
        from log
        where
        commandId in (104005 ,204005 ,404005)
        and errCode=0 and attr=0
        group by TUMBLE(times, INTERVAL '1' MINUTE),commandId

        union all

        select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time
        ,99999999
        ,count(*) as request_cnt
        ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
        ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
        from log
        where
        commandId in (104005 ,204005 ,404005)
        and errCode=0 and attr=0
        group by TUMBLE(times, INTERVAL '1' MINUTE)
    )

        
source:

    create table log (
      eventTime bigint
      ,times timestamp(3)
          ……………………
      ,commandId integer
      ,watermark for times as times - interval '5' second
    )
    with(
     'connector' = 'kafka-0.10',
     'topic' = '……',
     'properties.bootstrap.servers' = '……',
     'properties.group.id' = '……',
     'scan.startup.mode' = 'latest-offset',
     'format' = 'json'
    )

sink1:
create table result (
      request_time varchar
      ,request_id integer
      ,request_cnt bigint
      ,avg_resptime double
      ,stddev_resptime double
      ,insert_time varchar
    ) with (
      'connector' = 'kafka-0.10',
      'topic' = '……',
      'properties.bootstrap.servers' = '……',
      'properties.group.id' = '……',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    )
        




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复