Hi, 你的状态过期时间设置的是多久?对于普通的group by agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大
> -----原始邮件----- > 发件人: claylin <1012539...@qq.com> > 发送时间: 2020-05-17 17:41:13 (星期日) > 收件人: user-zh <user-zh@flink.apache.org> > 抄送: > 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > 链接这里 https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "tison"<wander4...@gmail.com>; > 发送时间: 2020年5月17日(星期天) 下午5:34 > 收件人: "user-zh"<user-zh@flink.apache.org>; > > 主题: Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > 考虑把 SQL 贴成 gist 链接? > > Best, > tison. > > > claylin <1012539...@qq.com> 于2020年5月17日周日 下午5:32写道: > > > sql作业定义如下,也通过TableConfig设置了最大和最小idle > > > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > > TABLE yy_yapmnetwork_original ( happenAt > BIGINT, uid BIGINT, > > appId STRING, deviceId > STRING, appVer STRING, dnsDur > BIGINT, > > useGlb INT, hitCache > INT, requestSize DOUBLE, > responseSize > > DOUBLE, totalDur BIGINT, > url STRING, statusCode INT, > > prototype STRING, netType > STRING, traceId STRING, ts AS > > CAST(FROM_UNIXTIME(happenAt/1000) AS > TIMESTAMP(3)), WATERMARK FOR ts AS > > ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > > 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', > > 'connector.properties.zookeeper.connect' = 'localhost:2181', > > 'connector.properties.bootstrap.servers' = > 'kafkawx007-core001.yy.com:8101 > > ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', ' > > connector.properties.group.id' = 'interface_success_rate_consumer', > > 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); > > create table request_latency_tbl ( app_id > string, app_ver string, > > net_type string, prototype > string, url string, > status_code > > int, w_start string, > success_cnt BIGINT, failure_cnt BIGINT, > > total_cnt BIGINT ) with( 'connector.type' = 'jdbc', > 'connector.url' = > > > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=true&amp;characterEncoding=utf-8&amp;zeroDateTimeBehavior=convertToNull&amp;autoReconnect=true', > > 'connector.table' = 'request_latency_statistics', 'connector.username' = > > 'yapm_metrics', 'connector.password' = '1234456', > > 'connector.write.flush.max-rows' = '1000', > 'connector.write.flush.interval' > > = '5s', 'connector.write.max-retries' = '2' ); create view > > request_1minutes_latency as select appId, > appVer, netType, prototype, > > url, statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') w_start, > > count(distinct traceId) filter (where statusCode in (200)) as > successCnt, > > count(distinct traceId) filter (where statusCode not > in (200)) as > > failureCnt, count(distinct traceId) as > total_cnt from > > yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, > > statusCode, DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm'); insert into > > request_latency_tbl select * from > request_1minutes_latency; ------------------------------ 刘大龙 浙江大学 控制系 智能系统与控制研究所 工控新楼217 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 Tel:18867547281