Re: 请教设置了table.exec.state.ttl后,结果数据不更新问题

2022-02-14 文章 Caizhi Weng
Hi!

图片不能显示,建议传到 imgur 等外部图床上,再把链接贴到邮件里。

设置 state ttl 之前 sink 数据能一直更新吗?确认不是因为后来的数据不符合某些 where 条件导致的吗?

liangjinghong  于2022年2月12日周六 14:39写道:

> 你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h");
>
> 然而,在flink作业运行12小时后,*我的update结果表再也没有更新过*。从web ui可以看到,我的源头与一些算子的Records 
> Sent一直在增长,任务也没有出现任何异常,checkpoint也正常,所以我很苦恼是哪里出现了问题
>
> 以下是我的SQL语句:
>
> CREATE TABLE `tbl_rpt_app_usage`
>
>  (`datepoint` TIMESTAMP,
>
>   `appId` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `applyNum` BIGINT ,
>
>   `releaseNum` BIGINT ,
>
>   `usedNum` BIGINT ,
>
>PRIMARY KEY (`datepoint`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '3306',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> CREATE TABLE `temporary_usage`
>
>  (`datepoint` TIMESTAMP,
>
>   `appId` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `applyNum` BIGINT ,
>
>   `releaseNum` BIGINT ,
>
>   `usedNum` BIGINT ,
>
>PRIMARY KEY (`datepoint`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> CREATE TABLE `tbl_cce_cluster`
>
>  (`clusterId` VARCHAR ,
>
>   `region` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `totalNode` BIGINT ,
>
>   `toolName` VARCHAR ,
>
>   `appId` VARCHAR ,
>
>PRIMARY KEY (`clusterId`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '3306',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> ---mysql落地表
>
> CREATE table sink(
>
>   code STRING,
>
>   name STRING,
>
>   usedcnt BIGINT,
>
>   `time` TIMESTAMP,
>
>   type STRING,
>
>   PRIMARY KEY (`time`) NOT ENFORCED
>
> ) with (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:mysql: ',
>
>'username' = '',
>
>'password' = '',
>
>'table-name' = 'sink'
>
> );
>
> (因为MySQLCDC似乎不支持在同步时筛选指定内容,所以目前同步后先创建筛选结果的虚拟表,再进行计算,还想请教下是否有更好的办法)
>
> CREATE VIEW rpt
>
>  (datepoint,appId,flavor,applyNum,releaseNum,usedNum)
>
>  AS
>
>  select * from tbl_rpt_app_usage
>
>  where datepoint > '2022-02-11 14:54:01'
>
>  and appId not in ('aaa')
>
>  union all
>
>  select * from temporary_usage
>
>
>
> CREATE VIEW cce
>
> (clusterId,region,flavor,totalNode,toolName,appId)
>
> AS
>
> select * from tbl_cce_cluster
>
> where toolName in ('bbb','ccc')
>
>
>
> ---代码逻辑如下:
>
> insert into sink
>
> select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
>
> select
>
> info.code,
>
> info.name,
>
> sum(rpt.usedNum) as usedcnt,'online' type
>
> from (
>
> select * from (
>
> select *,ROW_NUMBER() OVER(PARTITION by appId,flavor order by datepoint
> desc) as row_num from rpt/*+ OPTIONS('server-id'='1001-1005') */
>
> )where row_num =1
>
> )rpt
>
> join info
>
> on info.appid=rpt.appId
>
> group by info.code,info.name)
>
> union all
>
> select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
>
> select
>
> info.code,
>
> info.name,
>
> sum(totalNode) as usedcnt,'online' type
>
> from
>
> cce/*+ OPTIONS('server-id'='1006-1010') */
>
> join info
>
> on cce.appId=info.appid
>
> group by info.code,info.name)
>
> 我的结果表在的12小时(我设置的过期时间)后,再也没更新过。
>
>
>
> 另外,还想请教一个问题:目前group by算子的落地到数据库的结果只支持update
> 吗?业务期待获取历史数据,目前只能想到每分钟全量同步一次结果表的数据到另一个表里,这样就可以追踪到历史数据,是否还有更好的解决办法呢?
>
>
>
>
>
> 非常感谢您的阅读与解惑!
>


请教设置了table.exec.state.ttl后,结果数据不更新问题

2022-02-11 文章 liangjinghong
你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h");

然而,在flink作业运行12小时后,我的update结果表再也没有更新过。从web ui可以看到,我的源头与一些算子的Records 
Sent一直在增长,任务也没有出现任何异常,checkpoint也正常,所以我很苦恼是哪里出现了问题
[cid:image005.jpg@01D8201E.4C9301C0]
[cid:image006.jpg@01D8201E.4C9301C0]
以下是我的SQL语句:
CREATE TABLE `tbl_rpt_app_usage`
 (`datepoint` TIMESTAMP,
  `appId` VARCHAR ,
  `flavor` VARCHAR ,
  `applyNum` BIGINT ,
  `releaseNum` BIGINT ,
  `usedNum` BIGINT ,
   PRIMARY KEY (`datepoint`) NOT ENFORCED
 )WITH(
 'connector' = 'mysql-cdc',
 'hostname' = '',
 'port' = '3306',
 'username' = '',
 'password' = '',
 'database-name' = '',
 'table-name' = ''
 );
CREATE TABLE `temporary_usage`
 (`datepoint` TIMESTAMP,
  `appId` VARCHAR ,
  `flavor` VARCHAR ,
  `applyNum` BIGINT ,
  `releaseNum` BIGINT ,
  `usedNum` BIGINT ,
   PRIMARY KEY (`datepoint`) NOT ENFORCED
 )WITH(
 'connector' = 'mysql-cdc',
 'hostname' = '',
 'port' = '',
 'username' = '',
 'password' = '',
 'database-name' = '',
 'table-name' = ''
 );
CREATE TABLE `tbl_cce_cluster`
 (`clusterId` VARCHAR ,
  `region` VARCHAR ,
  `flavor` VARCHAR ,
  `totalNode` BIGINT ,
  `toolName` VARCHAR ,
  `appId` VARCHAR ,
   PRIMARY KEY (`clusterId`) NOT ENFORCED
 )WITH(
 'connector' = 'mysql-cdc',
 'hostname' = '',
 'port' = '3306',
 'username' = '',
 'password' = '',
 'database-name' = '',
 'table-name' = ''
 );
---mysql落地表
CREATE table sink(
  code STRING,
  name STRING,
  usedcnt BIGINT,
  `time` TIMESTAMP,
  type STRING,
  PRIMARY KEY (`time`) NOT ENFORCED
) with (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql: ',
   'username' = '',
   'password' = '',
   'table-name' = 'sink'
);
(因为MySQLCDC似乎不支持在同步时筛选指定内容,所以目前同步后先创建筛选结果的虚拟表,再进行计算,还想请教下是否有更好的办法)
CREATE VIEW rpt
 (datepoint,appId,flavor,applyNum,releaseNum,usedNum)
 AS
 select * from tbl_rpt_app_usage
 where datepoint > '2022-02-11 14:54:01'
 and appId not in ('aaa')
 union all
 select * from temporary_usage

CREATE VIEW cce
(clusterId,region,flavor,totalNode,toolName,appId)
AS
select * from tbl_cce_cluster
where toolName in ('bbb','ccc')

---代码逻辑如下:
insert into sink
select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
select
info.code,
info.name,
sum(rpt.usedNum) as usedcnt,'online' type
from (
select * from (
select *,ROW_NUMBER() OVER(PARTITION by appId,flavor order by datepoint desc) 
as row_num from rpt/*+ OPTIONS('server-id'='1001-1005') */
)where row_num =1
)rpt
join info
on info.appid=rpt.appId
group by info.code,info.name)
union all
select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
select
info.code,
info.name,
sum(totalNode) as usedcnt,'online' type
from
cce/*+ OPTIONS('server-id'='1006-1010') */
join info
on cce.appId=info.appid
group by info.code,info.name)
我的结果表在[cid:image007.png@01D8201D.D86E3030]的12小时(我设置的过期时间)后,再也没更新过。
[cid:image008.png@01D8201D.D86E3030]

另外,还想请教一个问题:目前group 
by算子的落地到数据库的结果只支持update吗?业务期待获取历史数据,目前只能想到每分钟全量同步一次结果表的数据到另一个表里,这样就可以追踪到历史数据,是否还有更好的解决办法呢?


非常感谢您的阅读与解惑!


Re: table.exec.state.ttl

2021-08-29 文章 Yun Tang
Hi 航飞

可以参照[1] 看是不是类似的问题


[1] https://issues.apache.org/jira/browse/FLINK-23721

祝好
唐云

From: 李航飞 
Sent: Thursday, August 26, 2021 19:02
To: user-zh 
Subject: table.exec.state.ttl

Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment 
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting 
setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 
设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的


Re:Re: table.exec.state.ttl

2021-08-26 文章 李航飞
你好:

我现在想在 execution environment 里面设置微批和stateValue的过期时间该怎么设?

这样 conf.setString("exec.state.ttl","15 s");
或者这样  conf.setString("stream.exec.state.ttl","15 s");











在 2021-08-26 19:05:07,"Caizhi Weng"  写道:
>Hi!
>
>table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
>里无效。
>
>李航飞  于2021年8月26日周四 下午7:02写道:
>
>> Configuration conf = new Configuration();
>> conf.setString("table.exec.mini-batch.enabled","true");
>> conf.setString("table.exec.mini-batch.allow-latency","15s");
>> conf.setString("table.exec.mini-batch.size","50");
>> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
>> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> execEnv.configure(conf,this.getClass().getClassLoader());
>> EnvironmentSetting setting = ...
>> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
>> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
>> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
>> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
>> 程序是通过StatementSet .execute()执行的


Re: table.exec.state.ttl

2021-08-26 文章 Caizhi Weng
Hi!

table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
里无效。

李航飞  于2021年8月26日周四 下午7:02写道:

> Configuration conf = new Configuration();
> conf.setString("table.exec.mini-batch.enabled","true");
> conf.setString("table.exec.mini-batch.allow-latency","15s");
> conf.setString("table.exec.mini-batch.size","50");
> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> execEnv.configure(conf,this.getClass().getClassLoader());
> EnvironmentSetting setting = ...
> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
> 程序是通过StatementSet .execute()执行的


table.exec.state.ttl

2021-08-26 文章 李航飞
Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment 
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting 
setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 
设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的