Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 yujianbo
好的非常感谢,我拿几个任务测试一波,看看性能能不能接受!


Hi,

没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint
meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。

总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level
[1] 来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size

祝好
唐云



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 Yun Tang
Hi,

没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint 
meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。

总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level [1] 
来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size

祝好
唐云


From: yujianbo <15205029...@163.com>
Sent: Wednesday, June 2, 2021 15:29
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

Hi,

确认的情况:

大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。

配置:
   idleStateRetention确实是设置3600秒,保留的ck目录是3个。
目前情况:
 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。
 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。
 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。

那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度?
JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白)





Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1]
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <[hidden email]>
Sent: Tuesday, June 1, 2021 10:51
To: [hidden email] <[hidden email]>
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 yujianbo
Hi,

确认的情况:
 
大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。

配置:
   idleStateRetention确实是设置3600秒,保留的ck目录是3个。
目前情况:
 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。
 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。
 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。

那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度?
JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白)





Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1]
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <[hidden email]>
Sent: Tuesday, June 1, 2021 10:51
To: [hidden email] <[hidden email]>
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 文章 HunterXHunter
那会一直增大下去吗,我跑了4天,ckp一直变大,没有稳定的迹象。是不是我需要调整compaction的配置



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 文章 Yun Tang
Hi,

增量checkpoint上传的是sst文件本身,里面可能有一部分空间是被无用数据占据的,你可以理解成增量checkpoint上传的是受到空间放大影响的RocksDB的数据,如果因为单机的数据量较小,没有及时触发compaction的话,确实存在整个远程checkpoint目录数据大于当前实际空间的情况。而关闭增量checkpoint,上传的其实是与savepoint格式一样的kv数据对,Flink会遍历整个DB,将目前有效的数据写出到远程。所以你关闭增量checkpoint,而发现checkpoint目录保持恒定大小的话,说明真实有效数据的空间是稳定的。

另外,其实不建议在日常生产中关闭增量checkpoint,主要原因是对于大规模作业来说,全量checkpoint一方面会对底层DFS来说每次需要上传的数据量变大,另一方面,也会增长单次checkpoint的
 e2e duration,有checkpoint超时失败的风险。

祝好
唐云

From: HunterXHunter <1356469...@qq.com>
Sent: Tuesday, June 1, 2021 11:44
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 HunterXHunter
我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
感谢大佬的回复!以后优先现在邮箱这边讨论发现问题再去提个issue!
我的 idleStateRetention确实是设置3600秒,我先进行测试看看。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 Yun Tang
Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用 
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata 
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint 
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink 
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1] 
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <15205029...@163.com>
Sent: Tuesday, June 1, 2021 10:51
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
没有更好的方式吗,这样治标不治本,那我大状态任务会有很多问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 HunterXHunter
关闭 增量checkpoint



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
有没有大佬帮忙看看



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
一、环境:
1、版本:1.12.0
2、flink sql
3、已经设置了setIdleStateRetention 为1小时
4、状态后端是rocksDB, 增量模式
5、源数据没有数据激增情况,任务已经跑了两天

二、详情
具体sql见第三大点,就是普通的group by统计的
sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。
   
我sql的groupby维度有加一个具体的分钟字段,所以一小时之后是不可能有一模一样的维度数据,那过期的数据正常是要被清理掉,那/checkpoint/shared/文件夹大小不断增长是否能说明过期的旧数据还没有被清理?
这种情况应该怎么处理

三、sql具体

CREATE TABLE user_behavior (
   `request_ip` STRING,
   `request_time` BIGINT,
   `header` STRING ,
//这个操作是将时间戳转为分钟
   `t_min` as cast(`request_time`-(`request_time` + 2880)%6 as
BIGINT),
   `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'-MM-dd
HH:mm:ss')),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) 
with (
   'connector' = 'kafka',
    
);


CREATE TABLE blackhole_table (
   `cnt` BIGINT,
   `lists` STRING
) WITH (
 'connector' = 'blackhole'
);


insert into blackhole_table 
select 
count(*) as cnt, 
LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING)))
as lists
from user_behavior 
group by `request_ip`,`header`,`t_min`;





--
Sent from: http://apache-flink.147419.n8.nabble.com/