Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-05-17 文章 LakeShen
Hi,

你可以描述一下你的 Flink 版本,具体空闲状态保留时间的含义,请参考一下[1]:

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time

Best,
LakeShen



claylin <1012539...@qq.com> 于2020年5月17日周日 下午10:24写道:

> 过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M
>
>
> --原始邮件--
> 发件人:"刘大龙" 发送时间:2020年5月17日(星期天) 晚上10:14
> 收件人:"user-zh"
> 主题:Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
>
>
>
> Hi,
>  你的状态过期时间设置的是多久?对于普通的group by
> agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大
>
>
>  -原始邮件-
>  发件人: claylin <1012539...@qq.com
>  发送时间: 2020-05-17 17:41:13 (星期日)
>  收件人: user-zh   抄送:
>  主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
>  链接这里nbsp;
> https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
> 
> <https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4>;
>
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"tison"  发送时间:nbsp;2020年5月17日(星期天) 下午5:34
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 
> 
>  考虑把 SQL 贴成 gist 链接?
> 
>  Best,
>  tison.
> 
> 
>  claylin <1012539...@qq.comgt; 于2020年5月17日周日 下午5:32写道:
> 
>  gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
>  gt;
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
>  gt; TABLE yy_yapmnetwork_original
> (nbsp;nbsp;nbsp;nbsp; happenAt
> BIGINT,nbsp;nbsp;nbsp;nbsp; uid BIGINT,
>  gt;nbsp; appId
> STRING,nbsp;nbsp;nbsp;nbsp; deviceId
> STRING,nbsp;nbsp;nbsp;nbsp; appVer
> STRING,nbsp;nbsp;nbsp;nbsp; dnsDur BIGINT,
>  gt;nbsp;nbsp;nbsp; useGlb
> INT,nbsp;nbsp;nbsp;nbsp; hitCache
> INT,nbsp;nbsp;nbsp;nbsp; requestSize
> DOUBLE,nbsp;nbsp;nbsp;nbsp; responseSize
>  gt; DOUBLE,nbsp;nbsp;nbsp;nbsp; totalDur
> BIGINT,nbsp;nbsp;nbsp;nbsp; url
> STRING,nbsp;nbsp;nbsp;nbsp; statusCode INT,
>  gt;nbsp; prototype
> STRING,nbsp;nbsp;nbsp;nbsp; netType
> STRING,nbsp;nbsp;nbsp;nbsp; traceId
> STRING,nbsp;nbsp;nbsp;nbsp; ts AS
>  gt; CAST(FROM_UNIXTIME(happenAt/1000) AS
> TIMESTAMP(3)),nbsp;nbsp;nbsp;nbsp; WATERMARK FOR ts AS
>  gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
>  gt; 'connector.version' = 'universal', 'connector.topic' =
> 'yapm_metrics',
>  gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
>  gt; 'connector.properties.bootstrap.servers' = '
> kafkawx007-core001.yy.com:8101
>  gt; ,kafkawx007-core002.yy.com:8101,
> kafkawx007-core003.yy.com:8101', '
>  gt; connector.properties.group.id' =
> 'interface_success_rate_consumer',
>  gt; 'connector.startup-mode' = 'latest-offset', 'format.type' =
> 'json' );
>  gt; create table request_latency_tbl
> (nbsp;nbsp;nbsp;nbsp; app_id
> string,nbsp;nbsp;nbsp;nbsp; app_ver string,
>  gt;nbsp;nbsp;nbsp; net_type
> string,nbsp;nbsp;nbsp;nbsp; prototype
> string,nbsp;nbsp;nbsp;nbsp; url
> string,nbsp;nbsp;nbsp;nbsp; status_code
>  gt; int,nbsp;nbsp;nbsp;nbsp; w_start
> string,nbsp;nbsp;nbsp;nbsp; success_cnt
> BIGINT,nbsp;nbsp;nbsp;nbsp; failure_cnt BIGINT,
>  gt;nbsp; total_cnt BIGINT ) with( 'connector.type' =
> 'jdbc', 'connector.url' =
>  gt;
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=trueamp;amp;characterEncoding=utf-8amp;amp;zeroDateTimeBehavior=convertToNullamp;amp;autoReconnect=true',
>  gt; 'connector.table' = 'request_latency_statistics',
> 'connector.username' =
>  gt; 'yapm_metrics', 'connector.password' = '1234456',
>  gt; 'connector.write.flush.max-rows' = '1000',
> 'connector.write.flush.interval'
>  gt; = '5s', 'connector.write.max-retries' = '2' ); create view
>  gt; request_1minutes_latencynbsp;
> asnbsp;nbsp;nbsp;nbsp; select appId, appVer, netType,
> prototype,
>  gt; url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start,
>  gt;nbsp; count(distinct traceId) filter (where statusCode
> in (200)) as successCnt,
>  gt;nbsp;nbsp;nbsp; count(distinct traceId) filter
> (where statusCode not in (200)) as
>  gt; failureCnt,nbsp;nbsp;nbsp;nbsp;
> count(distinct traceId) as
> total_cntnbsp;nbsp;nbsp;nbsp; from
>  gt; yy_yapmnetwork_original group by appId, appVer, netType,
> prototype, url,
>  gt; statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into
>  gt; request_latency_tblnbsp;nbsp;nbsp;nbsp;
> select * fromnbsp; request_1minutes_latency;
>
>
> --
> 刘大龙
>
> 浙江大学 控制系 智能系统与控制研究所 工控新楼217
> 地址:浙江省杭州市浙大路38号浙江大学玉泉校区
> Tel:18867547281


Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-05-17 文章 刘大龙
Hi,
   你的状态过期时间设置的是多久?对于普通的group by 
agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大


> -原始邮件-
> 发件人: claylin <1012539...@qq.com>
> 发送时间: 2020-05-17 17:41:13 (星期日)
> 收件人: user-zh 
> 抄送: 
> 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 链接这里https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
> 
> 
> 
> 
> --原始邮件--
> 发件人:"tison" 发送时间:2020年5月17日(星期天) 下午5:34
> 收件人:"user-zh" 
> 主题:Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 
> 
> 考虑把 SQL 贴成 gist 链接?
> 
> Best,
> tison.
> 
> 
> claylin <1012539...@qq.com 于2020年5月17日周日 下午5:32写道:
> 
>  sql作业定义如下,也通过TableConfig设置了最大和最小idle
>  
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
>  TABLE yy_yapmnetwork_original ( happenAt 
> BIGINT, uid BIGINT,
>  appId STRING, deviceId 
> STRING, appVer STRING, dnsDur 
> BIGINT,
>  useGlb INT, hitCache 
> INT, requestSize DOUBLE, 
> responseSize
>  DOUBLE, totalDur BIGINT, 
> url STRING, statusCode INT,
>  prototype STRING, netType 
> STRING, traceId STRING, ts AS
>  CAST(FROM_UNIXTIME(happenAt/1000) AS 
> TIMESTAMP(3)), WATERMARK FOR ts AS
>  ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
>  'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'connector.properties.bootstrap.servers' = 
> 'kafkawx007-core001.yy.com:8101
>  ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
>  connector.properties.group.id' = 'interface_success_rate_consumer',
>  'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
>  create table request_latency_tbl ( app_id 
> string, app_ver string,
>  net_type string, prototype 
> string, url string, 
> status_code
>  int, w_start string, 
> success_cnt BIGINT, failure_cnt BIGINT,
>  total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 
> 'connector.url' =
>  
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=trueamp;characterEncoding=utf-8amp;zeroDateTimeBehavior=convertToNullamp;autoReconnect=true',
>  'connector.table' = 'request_latency_statistics', 'connector.username' =
>  'yapm_metrics', 'connector.password' = '1234456',
>  'connector.write.flush.max-rows' = '1000', 
> 'connector.write.flush.interval'
>  = '5s', 'connector.write.max-retries' = '2' ); create view
>  request_1minutes_latency as select appId, 
> appVer, netType, prototype,
>  url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start,
>  count(distinct traceId) filter (where statusCode in (200)) as 
> successCnt,
>  count(distinct traceId) filter (where statusCode not 
> in (200)) as
>  failureCnt, count(distinct traceId) as 
> total_cnt from
>  yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
>  statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into
>  request_latency_tbl select * from 
> request_1minutes_latency;


--
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281