一、环境:
1、版本:1.12.0
2、flink sql
3、已经设置了setIdleStateRetention 为1小时
4、状态后端是rocksDB, 增量模式
5、源数据没有数据激增情况,任务已经跑了两天
二、详情
具体sql见第三大点,就是普通的group by统计的
sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。
我sql的groupby维度有加一个具体的分钟字段,所以一小时之后是不可能有一模一样的维度数据,那过期的数据正常是要被清理掉,那/checkpoint/shared/文件夹大小不断增长是否能说明过期的旧数据还没有被清理?
这种情况应该怎么处理
三、sql具体
CREATE TABLE user_behavior (
`request_ip` STRING,
`request_time` BIGINT,
`header` STRING ,
//这个操作是将时间戳转为分钟
`t_min` as cast(`request_time`-(`request_time` + 28800000)%60000 as
BIGINT),
`ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'yyyy-MM-dd
HH:mm:ss')),
WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE)
with (
'connector' = 'kafka',
........
);
CREATE TABLE blackhole_table (
`cnt` BIGINT,
`lists` STRING
) WITH (
'connector' = 'blackhole'
);
insert into blackhole_table
select
count(*) as cnt,
LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING)))
as lists
from user_behavior
group by `request_ip`,`header`,`t_min`;
--
Sent from: http://apache-flink.147419.n8.nabble.com/