Hi,
   你的状态过期时间设置的是多久?对于普通的group by 
agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大


> -----原始邮件-----
> 发件人: claylin <1012539...@qq.com>
> 发送时间: 2020-05-17 17:41:13 (星期日)
> 收件人: user-zh <user-zh@flink.apache.org>
> 抄送: 
> 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 链接这里&nbsp;https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
> 
> 
> 
> 
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"tison"<wander4...@gmail.com&gt;;
> 发送时间:&nbsp;2020年5月17日(星期天) 下午5:34
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> 
> 主题:&nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 
> 
> 考虑把 SQL 贴成 gist 链接?
> 
> Best,
> tison.
> 
> 
> claylin <1012539...@qq.com&gt; 于2020年5月17日周日 下午5:32写道:
> 
> &gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
> &gt; 
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
> &gt; TABLE yy_yapmnetwork_original (&nbsp;&nbsp;&nbsp;&nbsp; happenAt 
> BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; uid BIGINT,
> &gt;&nbsp; appId STRING,&nbsp;&nbsp;&nbsp;&nbsp; deviceId 
> STRING,&nbsp;&nbsp;&nbsp;&nbsp; appVer STRING,&nbsp;&nbsp;&nbsp;&nbsp; dnsDur 
> BIGINT,
> &gt;&nbsp;&nbsp;&nbsp; useGlb INT,&nbsp;&nbsp;&nbsp;&nbsp; hitCache 
> INT,&nbsp;&nbsp;&nbsp;&nbsp; requestSize DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; 
> responseSize
> &gt; DOUBLE,&nbsp;&nbsp;&nbsp;&nbsp; totalDur BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; 
> url STRING,&nbsp;&nbsp;&nbsp;&nbsp; statusCode INT,
> &gt;&nbsp; prototype STRING,&nbsp;&nbsp;&nbsp;&nbsp; netType 
> STRING,&nbsp;&nbsp;&nbsp;&nbsp; traceId STRING,&nbsp;&nbsp;&nbsp;&nbsp; ts AS
> &gt; CAST(FROM_UNIXTIME(happenAt/1000) AS 
> TIMESTAMP(3)),&nbsp;&nbsp;&nbsp;&nbsp; WATERMARK FOR ts AS
> &gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
> &gt; 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
> &gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
> &gt; 'connector.properties.bootstrap.servers' = 
> 'kafkawx007-core001.yy.com:8101
> &gt; ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
> &gt; connector.properties.group.id' = 'interface_success_rate_consumer',
> &gt; 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
> &gt; create table request_latency_tbl (&nbsp;&nbsp;&nbsp;&nbsp; app_id 
> string,&nbsp;&nbsp;&nbsp;&nbsp; app_ver string,
> &gt;&nbsp;&nbsp;&nbsp; net_type string,&nbsp;&nbsp;&nbsp;&nbsp; prototype 
> string,&nbsp;&nbsp;&nbsp;&nbsp; url string,&nbsp;&nbsp;&nbsp;&nbsp; 
> status_code
> &gt; int,&nbsp;&nbsp;&nbsp;&nbsp; w_start string,&nbsp;&nbsp;&nbsp;&nbsp; 
> success_cnt BIGINT,&nbsp;&nbsp;&nbsp;&nbsp; failure_cnt BIGINT,
> &gt;&nbsp; total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 
> 'connector.url' =
> &gt; 
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;amp;characterEncoding=utf-8&amp;amp;zeroDateTimeBehavior=convertToNull&amp;amp;autoReconnect=true',
> &gt; 'connector.table' = 'request_latency_statistics', 'connector.username' =
> &gt; 'yapm_metrics', 'connector.password' = '1234456',
> &gt; 'connector.write.flush.max-rows' = '1000', 
> 'connector.write.flush.interval'
> &gt; = '5s', 'connector.write.max-retries' = '2' ); create view
> &gt; request_1minutes_latency&nbsp; as&nbsp;&nbsp;&nbsp;&nbsp; select appId, 
> appVer, netType, prototype,
> &gt; url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start,
> &gt;&nbsp; count(distinct traceId) filter (where statusCode in (200)) as 
> successCnt,
> &gt;&nbsp;&nbsp;&nbsp; count(distinct traceId) filter (where statusCode not 
> in (200)) as
> &gt; failureCnt,&nbsp;&nbsp;&nbsp;&nbsp; count(distinct traceId) as 
> total_cnt&nbsp;&nbsp;&nbsp;&nbsp; from
> &gt; yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
> &gt; statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into
> &gt; request_latency_tbl&nbsp;&nbsp;&nbsp;&nbsp; select * from&nbsp; 
> request_1minutes_latency;


------------------------------
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281

回复