Re: 请教设置了table.exec.state.ttl后,结果数据不更新问题
Hi! 图片不能显示,建议传到 imgur 等外部图床上,再把链接贴到邮件里。 设置 state ttl 之前 sink 数据能一直更新吗?确认不是因为后来的数据不符合某些 where 条件导致的吗? liangjinghong 于2022年2月12日周六 14:39写道: > 你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h"); > > 然而,在flink作业运行12小时后,*我的update结果表再也没有更新过*。从web ui可以看到,我的源头与一些算子的Records > Sent一直在增长,任务也没有出现任何异常,checkpoint也正常,所以我很苦恼是哪里出现了问题 > > 以下是我的SQL语句: > > CREATE TABLE `tbl_rpt_app_usage` > > (`datepoint` TIMESTAMP, > > `appId` VARCHAR , > > `flavor` VARCHAR , > > `applyNum` BIGINT , > > `releaseNum` BIGINT , > > `usedNum` BIGINT , > >PRIMARY KEY (`datepoint`) NOT ENFORCED > > )WITH( > > 'connector' = 'mysql-cdc', > > 'hostname' = '', > > 'port' = '3306', > > 'username' = '', > > 'password' = '', > > 'database-name' = '', > > 'table-name' = '' > > ); > > CREATE TABLE `temporary_usage` > > (`datepoint` TIMESTAMP, > > `appId` VARCHAR , > > `flavor` VARCHAR , > > `applyNum` BIGINT , > > `releaseNum` BIGINT , > > `usedNum` BIGINT , > >PRIMARY KEY (`datepoint`) NOT ENFORCED > > )WITH( > > 'connector' = 'mysql-cdc', > > 'hostname' = '', > > 'port' = '', > > 'username' = '', > > 'password' = '', > > 'database-name' = '', > > 'table-name' = '' > > ); > > CREATE TABLE `tbl_cce_cluster` > > (`clusterId` VARCHAR , > > `region` VARCHAR , > > `flavor` VARCHAR , > > `totalNode` BIGINT , > > `toolName` VARCHAR , > > `appId` VARCHAR , > >PRIMARY KEY (`clusterId`) NOT ENFORCED > > )WITH( > > 'connector' = 'mysql-cdc', > > 'hostname' = '', > > 'port' = '3306', > > 'username' = '', > > 'password' = '', > > 'database-name' = '', > > 'table-name' = '' > > ); > > ---mysql落地表 > > CREATE table sink( > > code STRING, > > name STRING, > > usedcnt BIGINT, > > `time` TIMESTAMP, > > type STRING, > > PRIMARY KEY (`time`) NOT ENFORCED > > ) with ( > >'connector' = 'jdbc', > >'url' = 'jdbc:mysql: ', > >'username' = '', > >'password' = '', > >'table-name' = 'sink' > > ); > > (因为MySQLCDC似乎不支持在同步时筛选指定内容,所以目前同步后先创建筛选结果的虚拟表,再进行计算,还想请教下是否有更好的办法) > > CREATE VIEW rpt > > (datepoint,appId,flavor,applyNum,releaseNum,usedNum) > > AS > > select * from tbl_rpt_app_usage > > where datepoint > '2022-02-11 14:54:01' > > and appId not in ('aaa') > > union all > > select * from temporary_usage > > > > CREATE VIEW cce > > (clusterId,region,flavor,totalNode,toolName,appId) > > AS > > select * from tbl_cce_cluster > > where toolName in ('bbb','ccc') > > > > ---代码逻辑如下: > > insert into sink > > select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from( > > select > > info.code, > > info.name, > > sum(rpt.usedNum) as usedcnt,'online' type > > from ( > > select * from ( > > select *,ROW_NUMBER() OVER(PARTITION by appId,flavor order by datepoint > desc) as row_num from rpt/*+ OPTIONS('server-id'='1001-1005') */ > > )where row_num =1 > > )rpt > > join info > > on info.appid=rpt.appId > > group by info.code,info.name) > > union all > > select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from( > > select > > info.code, > > info.name, > > sum(totalNode) as usedcnt,'online' type > > from > > cce/*+ OPTIONS('server-id'='1006-1010') */ > > join info > > on cce.appId=info.appid > > group by info.code,info.name) > > 我的结果表在的12小时(我设置的过期时间)后,再也没更新过。 > > > > 另外,还想请教一个问题:目前group by算子的落地到数据库的结果只支持update > 吗?业务期待获取历史数据,目前只能想到每分钟全量同步一次结果表的数据到另一个表里,这样就可以追踪到历史数据,是否还有更好的解决办法呢? > > > > > > 非常感谢您的阅读与解惑! >
请教设置了table.exec.state.ttl后,结果数据不更新问题
你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h"); 然而,在flink作业运行12小时后,我的update结果表再也没有更新过。从web ui可以看到,我的源头与一些算子的Records Sent一直在增长,任务也没有出现任何异常,checkpoint也正常,所以我很苦恼是哪里出现了问题 [cid:image005.jpg@01D8201E.4C9301C0] [cid:image006.jpg@01D8201E.4C9301C0] 以下是我的SQL语句: CREATE TABLE `tbl_rpt_app_usage` (`datepoint` TIMESTAMP, `appId` VARCHAR , `flavor` VARCHAR , `applyNum` BIGINT , `releaseNum` BIGINT , `usedNum` BIGINT , PRIMARY KEY (`datepoint`) NOT ENFORCED )WITH( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '3306', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '' ); CREATE TABLE `temporary_usage` (`datepoint` TIMESTAMP, `appId` VARCHAR , `flavor` VARCHAR , `applyNum` BIGINT , `releaseNum` BIGINT , `usedNum` BIGINT , PRIMARY KEY (`datepoint`) NOT ENFORCED )WITH( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '' ); CREATE TABLE `tbl_cce_cluster` (`clusterId` VARCHAR , `region` VARCHAR , `flavor` VARCHAR , `totalNode` BIGINT , `toolName` VARCHAR , `appId` VARCHAR , PRIMARY KEY (`clusterId`) NOT ENFORCED )WITH( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '3306', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '' ); ---mysql落地表 CREATE table sink( code STRING, name STRING, usedcnt BIGINT, `time` TIMESTAMP, type STRING, PRIMARY KEY (`time`) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql: ', 'username' = '', 'password' = '', 'table-name' = 'sink' ); (因为MySQLCDC似乎不支持在同步时筛选指定内容,所以目前同步后先创建筛选结果的虚拟表,再进行计算,还想请教下是否有更好的办法) CREATE VIEW rpt (datepoint,appId,flavor,applyNum,releaseNum,usedNum) AS select * from tbl_rpt_app_usage where datepoint > '2022-02-11 14:54:01' and appId not in ('aaa') union all select * from temporary_usage CREATE VIEW cce (clusterId,region,flavor,totalNode,toolName,appId) AS select * from tbl_cce_cluster where toolName in ('bbb','ccc') ---代码逻辑如下: insert into sink select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from( select info.code, info.name, sum(rpt.usedNum) as usedcnt,'online' type from ( select * from ( select *,ROW_NUMBER() OVER(PARTITION by appId,flavor order by datepoint desc) as row_num from rpt/*+ OPTIONS('server-id'='1001-1005') */ )where row_num =1 )rpt join info on info.appid=rpt.appId group by info.code,info.name) union all select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from( select info.code, info.name, sum(totalNode) as usedcnt,'online' type from cce/*+ OPTIONS('server-id'='1006-1010') */ join info on cce.appId=info.appid group by info.code,info.name) 我的结果表在[cid:image007.png@01D8201D.D86E3030]的12小时(我设置的过期时间)后,再也没更新过。 [cid:image008.png@01D8201D.D86E3030] 另外,还想请教一个问题:目前group by算子的落地到数据库的结果只支持update吗?业务期待获取历史数据,目前只能想到每分钟全量同步一次结果表的数据到另一个表里,这样就可以追踪到历史数据,是否还有更好的解决办法呢? 非常感谢您的阅读与解惑!
Re: table.exec.state.ttl
Hi 航飞 可以参照[1] 看是不是类似的问题 [1] https://issues.apache.org/jira/browse/FLINK-23721 祝好 唐云 From: 李航飞 Sent: Thursday, August 26, 2021 19:02 To: user-zh Subject: table.exec.state.ttl Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15s"); conf.setString("table.exec.mini-batch.size","50"); conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果 上次那个 allow 也就算了,这次这个 table.exec.state.ttl 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。 程序是通过StatementSet .execute()执行的
Re:Re: table.exec.state.ttl
你好: 我现在想在 execution environment 里面设置微批和stateValue的过期时间该怎么设? 这样 conf.setString("exec.state.ttl","15 s"); 或者这样 conf.setString("stream.exec.state.ttl","15 s"); 在 2021-08-26 19:05:07,"Caizhi Weng" 写道: >Hi! > >table 层的配置是加在 table environment 的 table config 里的,加在 execution environment >里无效。 > >李航飞 于2021年8月26日周四 下午7:02写道: > >> Configuration conf = new Configuration(); >> conf.setString("table.exec.mini-batch.enabled","true"); >> conf.setString("table.exec.mini-batch.allow-latency","15s"); >> conf.setString("table.exec.mini-batch.size","50"); >> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment >> execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >> execEnv.configure(conf,this.getClass().getClassLoader()); >> EnvironmentSetting setting = ... >> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2 >> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果 >> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl >> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。 >> 程序是通过StatementSet .execute()执行的
Re: table.exec.state.ttl
Hi! table 层的配置是加在 table environment 的 table config 里的,加在 execution environment 里无效。 李航飞 于2021年8月26日周四 下午7:02写道: > Configuration conf = new Configuration(); > conf.setString("table.exec.mini-batch.enabled","true"); > conf.setString("table.exec.mini-batch.allow-latency","15s"); > conf.setString("table.exec.mini-batch.size","50"); > conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment > execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > execEnv.configure(conf,this.getClass().getClassLoader()); > EnvironmentSetting setting = ... > StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2 > 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果 > 上次那个 allow 也就算了,这次这个 table.exec.state.ttl > 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。 > 程序是通过StatementSet .execute()执行的
table.exec.state.ttl
Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15s"); conf.setString("table.exec.mini-batch.size","50"); conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果 上次那个 allow 也就算了,这次这个 table.exec.state.ttl 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。 程序是通过StatementSet .execute()执行的