Re:Re: rocksdb backend设置后没有启用

2022-02-27 文章 swessia
找到原因了,代码在提交job时读取了本地老的配置文件,使得state backend配置覆盖了集群配置

感谢大佬








在 2022-02-25 17:09:09,"胡伟华"  写道:
>Hi, swessia
>
>flink-conf 中的 state.backend 是集群默认配置,你可以检查下 Job 是否指定了backend。
>具体方式可以参考官方文档: 
>https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#configuring-a-state-backend
> 
>
>
>可以在 JobManager 日志中检索关键字:"Using application-defined state backend” 来确认 job 
>真正使用的 StateBackend
>
>
>> 2022年2月23日 上午11:36,swessia  写道:
>> 
>> flink版本:1.13.5
>> 
>> 
>> 
>> 
>> 
>> 
>> flink-conf.yml中的rocksdb配置:
>> taskmanager.memory.process.size: 2048m
>> taskmanager.memory.managed.fraction: 0.15
>> taskmanager.memory.network.fraction: 0.1
>> 
>> 
>> state.backend: rocksdb
>> state.backend.incremental: true
>> state.checkpoints.dir: 
>> hdfs:///userdata/flink-1-13-5/yarn/high-availability/checkpoints
>> state.savepoints.dir: 
>> hdfs:///userdata/flink-1-13-5/yarn/high-availability/savepoints
>> state.backend.rocksdb.localdir: /userdata/data/flink-1-13-5/tmp
>> state.checkpoints.num-retained: 3
>> 
>> 
>> 使用yarn-session方式部署flink job
>> 发现在job启动后taskmanager 的manage memory一直是0,状态也一直存在taskmanager的内存里,最后内存占满频繁full 
>> gc
>> 
>> 
>> 有大佬知道原因吗?
>


Re: 状态初始化

2022-02-27 文章 Yun Tang
Hi,

这个需求在社区里面称之为 state bootstrapping, 以前在state processor API没有引入时,还有第三方的工具 bravo 
[1]。
我理解你的需求完全可以有state processor API完成,生成一个savepoint,由新作业消费。目前社区也在考虑支持生成native 
savepoint,用以加快生成速度 [2]


[1] https://github.com/king/bravo
[2] https://issues.apache.org/jira/browse/FLINK-25528


Best
Yun Tang


From: Jiangang Liu 
Sent: Thursday, February 24, 2022 10:00
To: user-zh 
Subject: Re: 状态初始化

作业在启动时可以使用 Processor API加载状态,可以参考
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

huangzhi...@iwgame.com  于2022年2月23日周三 20:28写道:

>
> flink是否能够做到程序第一次启动还没有checkpoint的情况下,对状态进行初始化?
>
>
> huangzhi...@iwgame.com
>


回复: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-27 文章 Liu Join
有没有考虑过将多条数据拼接为一条replace 
SQL写入数据库,这样也不会对数据库造成太大的压力,至于多少条拼接为一条可以去测试再决定,我用过的有500,1000条数据拼接为一条sql写入数据库。

从 Windows 版邮件发送

发件人: yidan zhao
发送时间: 2022年2月28日 10:20
收件人: user-zh
主题: Re: 实时数据入库怎样过滤中间状态,保证最终一致

对数据的实时性、延迟有多严格。
基于process算子处理,来一个orderId,记录status到状态,不要输出,同时设置定时器触发输出。
新数据进入,先判定状态,timestamp更大就更新状态,并更新定时器。

比如说定时器10s,就是一个订单只有连续10s状态不变才会被输出,否则就等待后续可能的更大ts的订单状态。
前提是你的订单必须带ts这个字段,能表达哪个order更晚。

18703416...@163.com <18703416...@163.com> 于2022年2月28日周一 10:00写道:

> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>
> > 2022年2月25日 下午6:45,Lei Wang  写道:
> >
> > 场景描述:
> > Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> > order_id   status
> > 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> >
> > 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> > 最终的状态不丢,但这个最终的状态也不确定是多少。
> >
> > 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> > 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> >
> > 请问有什么其他的解决方法吗?
> >
> > 谢谢,
> > 王磊
>
>



Re: flink状态共享

2022-02-27 文章 yidan zhao
最好讲下原始需求,直接问题从设计角度不太可能,讲原始需求。

18703416...@163.com <18703416...@163.com> 于2022年2月25日周五 18:26写道:

> 如果不同算子 需要共享状态,是否考虑 归为一个算子进行处理,同理后面的算子也是
>
> > 2022年2月25日 下午4:30,huangzhi...@iwgame.com 写道:
> >
> > 对于keyed datastream 不同的算子之间是否能够共享同一状态,或者后面的算子任务,是否可以拿到前一个算子任务中的状态?
> >
> >
> >
> > huangzhi...@iwgame.com
>
>


Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-27 文章 yidan zhao
对数据的实时性、延迟有多严格。
基于process算子处理,来一个orderId,记录status到状态,不要输出,同时设置定时器触发输出。
新数据进入,先判定状态,timestamp更大就更新状态,并更新定时器。

比如说定时器10s,就是一个订单只有连续10s状态不变才会被输出,否则就等待后续可能的更大ts的订单状态。
前提是你的订单必须带ts这个字段,能表达哪个order更晚。

18703416...@163.com <18703416...@163.com> 于2022年2月28日周一 10:00写道:

> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>
> > 2022年2月25日 下午6:45,Lei Wang  写道:
> >
> > 场景描述:
> > Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> > order_id   status
> > 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> >
> > 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> > 最终的状态不丢,但这个最终的状态也不确定是多少。
> >
> > 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> > 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> >
> > 请问有什么其他的解决方法吗?
> >
> > 谢谢,
> > 王磊
>
>


Re: 如何按比例丢弃kafka中消费的数据

2022-02-27 文章 张昊 陈
你好,使用 DataStream API 的话你可以在读入数据后紧接着用 filter 方法根据一定策略过滤掉一部分数据再进行业务处理即可。

Best,
Zhanghao Chen

From: jack zhang 
Sent: Saturday, February 26, 2022 9:01
To: user-zh@flink.apache.org 
Subject: 如何按比例丢弃kafka中消费的数据

1、flink程序资源有限,kafka中数据比较多,想要按一定比例丢弃数据(或者其它策略),减轻flink 程序压力,有什么方法吗?


Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-27 文章 18703416...@163.com
keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小

> 2022年2月25日 下午6:45,Lei Wang  写道:
> 
> 场景描述:
> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> order_id   status
> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> 
> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> 最终的状态不丢,但这个最终的状态也不确定是多少。
> 
> 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> 
> 请问有什么其他的解决方法吗?
> 
> 谢谢,
> 王磊



Re: 如何按比例丢弃kafka中消费的数据

2022-02-27 文章 18703416...@163.com
自定义 kafkasource 的 DeserializationSchema
丢弃的返回 null, source 的下一个filter 算子进行过滤即可

> 2022年2月26日 上午9:01,jack zhang  写道:
> 
> 1、flink程序资源有限,kafka中数据比较多,想要按一定比例丢弃数据(或者其它策略),减轻flink 程序压力,有什么方法吗?