Re:Re: Re: 关于Flink state初始化的问题

2022-08-30 文章 曲洋
那我理解了,描述的太清晰了,我确实之前没理解state的生命周期和使用方法,十分感谢

在 2022-08-29 18:42:51,"Zhiwen Sun"  写道:
>你应该没有正确理解 state 的使用
>
>我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。
>
>基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ;
>另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个
>key)。
>
>回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。
>
>
>
>
>
>
>Zhiwen Sun
>
>
>
>On Fri, Aug 26, 2022 at 1:55 PM 曲洋  wrote:
>
>>
>> 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗.
>> 嗯嗯,我现在改成先进行判断
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-08-26 11:22:43,"Hangxiang Yu"  写道:
>> >open确实是初始化的时候就会调用的;
>>
>> >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的;
>> >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法;
>> >
>> >On Fri, Aug 26, 2022 at 10:25 AM 曲洋  wrote:
>> >
>> >> 各位好,
>> >>
>> >>
>> 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
>> >> 我重写了RichMapFunction,yearTotal
>> >>
>> 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
>> >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
>> >> public static class AccumulateAmounts extends RichMapFunction> >> BlueAccumulaterInitState> {
>> >> private transient ValueState
>> >> statAccumulator;
>> >>
>> >>
>> >> @Override
>> >> public BlueAccumulaterInitState map(v2bean currentAccumulator)
>> >> throws Exception {
>> >>
>> >>
>> >> BlueAccumulaterInitState stat = (statAccumulator.value() !=
>> >> null) ? statAccumulator.value() : new BlueAccumulaterInitState();
>> >> Long yearIncrement = year.equals(stat.getYear()) ?
>> >> stat.getYearMetric() + 1L : 1L;
>> >> stat.setYearMetric(yearIncrement);
>> >>
>> >>
>> >> statAccumulator.update(stat);
>> >> return stat;
>> >> }
>> >>
>> >>
>> >> @Override
>> >> public void open(Configuration config) {
>> >> ValueStateDescriptor descriptor =
>> >> new ValueStateDescriptor<>(
>> >> "total",
>> >> TypeInformation.of(new
>> >> TypeHint() {
>> >> }));
>> >> statAccumulator = getRuntimeContext().getState(descriptor);
>> >> ExecutionConfig.GlobalJobParameters globalParams =
>> >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>> >> Configuration globConf = (Configuration) globalParams;
>> >> long yearTotal =
>> >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
>> >> statAccumulator.value().setYearMetric(yearTotal);
>> >>
>> >>
>> >>
>> >> }
>> >> }
>> >
>> >
>> >
>> >--
>> >Best,
>> >Hangxiang.
>>


Re: Re: 关于Flink state初始化的问题

2022-08-29 文章 Zhiwen Sun
你应该没有正确理解 state 的使用

我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。

基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ;
另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个
key)。

回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。






Zhiwen Sun



On Fri, Aug 26, 2022 at 1:55 PM 曲洋  wrote:

>
> 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗.
> 嗯嗯,我现在改成先进行判断
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-08-26 11:22:43,"Hangxiang Yu"  写道:
> >open确实是初始化的时候就会调用的;
>
> >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的;
> >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法;
> >
> >On Fri, Aug 26, 2022 at 10:25 AM 曲洋  wrote:
> >
> >> 各位好,
> >>
> >>
> 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
> >> 我重写了RichMapFunction,yearTotal
> >>
> 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
> >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
> >> public static class AccumulateAmounts extends RichMapFunction >> BlueAccumulaterInitState> {
> >> private transient ValueState
> >> statAccumulator;
> >>
> >>
> >> @Override
> >> public BlueAccumulaterInitState map(v2bean currentAccumulator)
> >> throws Exception {
> >>
> >>
> >> BlueAccumulaterInitState stat = (statAccumulator.value() !=
> >> null) ? statAccumulator.value() : new BlueAccumulaterInitState();
> >> Long yearIncrement = year.equals(stat.getYear()) ?
> >> stat.getYearMetric() + 1L : 1L;
> >> stat.setYearMetric(yearIncrement);
> >>
> >>
> >> statAccumulator.update(stat);
> >> return stat;
> >> }
> >>
> >>
> >> @Override
> >> public void open(Configuration config) {
> >> ValueStateDescriptor descriptor =
> >> new ValueStateDescriptor<>(
> >> "total",
> >> TypeInformation.of(new
> >> TypeHint() {
> >> }));
> >> statAccumulator = getRuntimeContext().getState(descriptor);
> >> ExecutionConfig.GlobalJobParameters globalParams =
> >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> >> Configuration globConf = (Configuration) globalParams;
> >> long yearTotal =
> >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
> >> statAccumulator.value().setYearMetric(yearTotal);
> >>
> >>
> >>
> >> }
> >> }
> >
> >
> >
> >--
> >Best,
> >Hangxiang.
>


Re: 关于Flink state初始化的问题

2022-08-25 文章 Hangxiang Yu
open确实是初始化的时候就会调用的;
第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的;
这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法;

On Fri, Aug 26, 2022 at 10:25 AM 曲洋  wrote:

> 各位好,
>
>  
> 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
> 我重写了RichMapFunction,yearTotal
> 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
> public static class AccumulateAmounts extends RichMapFunction BlueAccumulaterInitState> {
> private transient ValueState
> statAccumulator;
>
>
> @Override
> public BlueAccumulaterInitState map(v2bean currentAccumulator)
> throws Exception {
>
>
> BlueAccumulaterInitState stat = (statAccumulator.value() !=
> null) ? statAccumulator.value() : new BlueAccumulaterInitState();
> Long yearIncrement = year.equals(stat.getYear()) ?
> stat.getYearMetric() + 1L : 1L;
> stat.setYearMetric(yearIncrement);
>
>
> statAccumulator.update(stat);
> return stat;
> }
>
>
> @Override
> public void open(Configuration config) {
> ValueStateDescriptor descriptor =
> new ValueStateDescriptor<>(
> "total",
> TypeInformation.of(new
> TypeHint() {
> }));
> statAccumulator = getRuntimeContext().getState(descriptor);
> ExecutionConfig.GlobalJobParameters globalParams =
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> Configuration globConf = (Configuration) globalParams;
> long yearTotal =
> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
> statAccumulator.value().setYearMetric(yearTotal);
>
>
>
> }
> }



-- 
Best,
Hangxiang.


Re:Re: Re: [Internet]Re: Re: Some question with Flink state

2022-06-02 文章 Xuyang
Hi, 理论上来说这句话是不是有问题?


> “是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换”


因为ValueState也是keyedState的一种,所以也是每个key各自维护一个valuestate,不同的key之间是隔离的。
其实一般情况下ValueState里面存Map,和直接MapState没啥区别,只不过在不同的状态存储上和状态的TTL策略有略微不同,所以不太推荐ValueState里面存Map。
所以其实还是看具体的业务场景,假如只是算一个累加的值的话,用valuestate就够了。




--

Best!
Xuyang





在 2022-05-25 13:38:52,"lxk7...@163.com"  写道:
>
>刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
>"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
>我理解   
>是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
>这样的话,大部分场景其实都适合使用map-state。
>
>
>lxk7...@163.com
> 
>From: jurluo(罗凯)
>Date: 2022-05-25 11:05
>To: user-zh@flink.apache.org
>Subject: Re: [Internet]Re: Re: Some question with Flink state
>老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
>group,然后固定的key 
>group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
> group里面的key都可以通过map-state的user-key去分别存储。
> 
>> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
>> 
>> 图片好像又挂了  我重发下
>> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
>> 
>> 
>> 
>>下面是我的代码及测试结果
>> 
>> 
>> 
>> 一.使用int类型
>> 
>> 
>> 
>>public class KeyByTest {
>> 
>> 
>> 
>> public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>> 
>> env.setParallelism(10);
>> 
>> 
>> 
>> 
>> 
>> DataStreamSource dataDataStreamSource = 
>> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
>> 
>> 
>> 
>> new data(1, "123", "分类页"),
>> 
>> 
>> 
>> new data(2, "r-123", "搜索结果页"),
>> 
>> 
>> 
>> new data(1, "r-123", "我的页"),
>> 
>> 
>> 
>> new data(3, "r-4567", "搜索结果页")));
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> SingleOutputStreamOperator map = 
>> dataDataStreamSource.keyBy(new MyKeySelector())
>> 
>> 
>> 
>> .map(new RichMapFunction() {
>> 
>> 
>> 
>> 
>> 
>> @Override
>> 
>> 
>> 
>> public String map(data data) throws Exception {
>> 
>> 
>> 
>> System.out.println(data.toString() + "的subtask为:" + 
>> getRuntimeContext().getIndexOfThisSubtask() );
>> 
>> 
>> 
>> return data.toString();
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> });
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> env.execute("test");
>> 
>> 
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> class data{
>> 
>> 
>> 
>> private int id;
>> 
>> 
>> 
>> private String goods;
>> 
>> 
>> 
>> private String pageName;
>> 
>> 
>> 
>> 
>> 
>> public data(int id, String goods, String pageName) {
>> 
>> 
>> 
>> this.id = id;
>> 
>> 
>> 
>> this.goods = goods;
>> 
>> 
>> 
>> this.pageName = pageName;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> public data() {
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public int getId() {
>> 
>> 
>> 
>> return id;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setId(int id) {
>> 
>> 
>> 
>> this.id = id;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public String getGoods() {
>> 
>> 
>> 
>> return goods;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setGoods(String goods) {
>> 
>> 
>> 
>> this.goods = goods;

Re: Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com

刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
我理解   
是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
这样的话,大部分场景其实都适合使用map-state。


lxk7...@163.com
 
From: jurluo(罗凯)
Date: 2022-05-25 11:05
To: user-zh@flink.apache.org
Subject: Re: [Internet]Re: Re: Some question with Flink state
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
group,然后固定的key 
group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
 group里面的key都可以通过map-state的user-key去分别存储。
 
> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>下面是我的代码及测试结果
> 
> 
> 
> 一.使用int类型
> 
> 
> 
>public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
> new data(1, "123", "分类页"),
> 
> 
> 
> new data(2, "r-123", "搜索结果页"),
> 
> 
> 
> new data(1, "r-123", "我的页"),
> 
> 
> 
> new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private int id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(int id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public int getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(int id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public Integer getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
&g

Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 文章 罗凯
rce dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
> 
> 
> 
> new data("1", "123", "分类页"),
> 
> 
> 
> new data("2", "r-123", "搜索结果页"),
> 
> 
> 
> new data("2", "r-123", "我的页"),
> 
> 
> 
> new data("3", "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private String id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(String id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(String id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 最终控制台输出如下:
> 
> 
> https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png
> 
> 
> 
> 
> 可以看见只分了两个组,我不清楚这是否是一个bug.
> 
> 
> 
> 
> 
> 
> 
> lxk7...@163.com
> 
> 
> 
> 
> 
> 
> 
> From: Xuyang
> 
> 
> 
> Date: 2022-05-24 21:35
> 
> 
> 
> To: user-zh
> 
> 
> 
> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
> 我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
> 
> 
> 
> 在 2022-05-24 21:06:58,"lxk7...@163.com"  写道:
> 
> 
> 
>> 如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
> 
> 
> 
>> 
> 
> 
> 
>> 
> 
> 
> 
>> 
> 
> 
> 
>> lxk7...@163.com
> 
> 
> 
>> 
> 
> 
> 
>> From: Xuyang
> 
> 
> 
>> Date: 2022-05-24 20:51
> 
> 
> 
>> To: user-zh
> 
> 
> 
>> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
>> 看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
> 
> 
> 
>> 在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
> 
> 
> 
>>> 
> 
> 
> 
>>> https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
> 
> 
> 
>>> https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
> 
> 
> 
>>> 
> 
> 
> 
>>> 这样呢
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> lxk7...@163.com
> 
> 
> 
>>> 
> 
> 
> 
>>> From: Xuyang
> 
> 
> 
>>> Date: 2022-05-24 20:17
> 
> 
> 
>>> To: user-zh
> 
> 
> 
>>> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
>>> Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> 在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
> 
> 
>>> 
> 
> 
> 
>>> 图片好像有点问题,重新上传一下
> 
> 
> 
>>> lxk7...@163.com
> 
> 
> 
>>> From: Hangxiang Yu
> 
> 
> 
>>> Date: 2022-05-24 12:09
> 
> 
> 
>>> To: user-zh
> 
> 
> 
>>> Subject: Re: Re: Some question with Flink state
> 
> 
> 
>>> 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
> 
> 
> 
>>> selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
> 
> 
> 
>>> 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
> 
> 
> 
>>> On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 
> 
> 
>>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 
> 
> 
> 
>>>> lxk7...@163.com
> 
> 
> 
>>>> 
> 
> 
> 
>>>> From: Hangxiang Yu
> 
> 
> 
>>>> Date: 2022-05-23 23:09
> 
> 
> 
>>>> To: user-zh; lxk7491
> 
> 
> 
>>>> Subject: Re: Some question with Flink state
> 
> 
> 
>>>> Hello,
> 
> 
> 
>>>> All states will not be shared in different parallelisms.
> 
> 
> 
>>>> BTW, English questions could be sent to u...@flink.apache.org.
> 
> 
> 
>>>> 
> 
> 
> 
>>>> Best,
> 
> 
> 
>>>> Hangxiang.
> 
> 
> 
>>>> 
> 
> 
> 
>>>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
> 
> 
> 
>>>> 
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> Hi everyone
> 
> 
> 
>>>>> I was used Flink keyed-state in my Project.But I found some questions
> 
> 
> 
>>>>> that make me confused.
> 
> 
> 
>>>>> when I used value-state in multi parallelism  the value is not I
> 
> 
> 
>>>> wanted.
> 
> 
> 
>>>>> So I guess that value-state is in every parallelism. every parallelism
> 
> 
> 
>>>>> saved their only value  which means the value is Thread-Level
> 
> 
> 
>>>>> But when I used map-state,the value is correctly. I mean the map-state
> 
> 
> 
>>>>> was shared by every parallelism.
> 
> 
> 
>>>>>looking forward to your reply
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> lxk7...@163.com
> 
> 
> 
>>>>> 
> 
> 
> 
>>>> 
> 
> 



Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
 '}';



    }



}





class MyKeySelector implements KeySelector{





    @Override



    public String getKey(data data) throws Exception {



    return data.getId();



    }



}



最终控制台输出如下:


https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png




可以看见只分了两个组,我不清楚这是否是一个bug.







lxk7...@163.com



 



From: Xuyang



Date: 2022-05-24 21:35



To: user-zh



Subject: Re:Re: Re: Some question with Flink state



我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。



在 2022-05-24 21:06:58,"lxk7...@163.com"  写道:



>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?



>



>



>



>lxk7...@163.com



> 



>From: Xuyang



>Date: 2022-05-24 20:51



>To: user-zh



>Subject: Re:Re: Re: Some question with Flink state



>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下



>在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:



>>



>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png



>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png



>>



>>这样呢



>>



>>



>>lxk7...@163.com



>> 



>>From: Xuyang



>>Date: 2022-05-24 20:17



>>To: user-zh



>>Subject: Re:Re: Re: Some question with Flink state



>>Hi, 你的图还是挂了,可以使用图床工具试一下



>> 



>> 



>> 



>>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:



>> 



>>图片好像有点问题,重新上传一下



>>lxk7...@163.com



>>From: Hangxiang Yu



>>Date: 2022-05-24 12:09



>>To: user-zh



>>Subject: Re: Re: Some question with Flink state



>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key



>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);



>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;



>>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:



>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。



>>>



>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。



>>>



>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。



>>>



>>>



>>>



>>> lxk7...@163.com



>>>



>>> From: Hangxiang Yu



>>> Date: 2022-05-23 23:09



>>> To: user-zh; lxk7491



>>> Subject: Re: Some question with Flink state



>>> Hello,



>>> All states will not be shared in different parallelisms.



>>> BTW, English questions could be sent to u...@flink.apache.org.



>>>



>>> Best,



>>> Hangxiang.



>>>



>>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:



>>>



>>> >



>>> > Hi everyone



>>> >    I was used Flink keyed-state in my Project.But I found some questions



>>> > that make me confused.



>>> >    when I used value-state in multi parallelism  the value is not I



>>> wanted.



>>> >    So I guess that value-state is in every parallelism. every parallelism



>>> > saved their only value  which means the value is Thread-Level



>>> >    But when I used map-state,the value is correctly. I mean the map-state



>>> > was shared by every parallelism.



>>> >   looking forward to your reply



>>> >



>>> >



>>> > lxk7...@163.com



>>> >



>>>




Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
   下面是我的代码及测试结果
一.使用int类型
   public class KeyByTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
new data(1, "123", "分类页"),
new data(2, "r-123", "搜索结果页"),
new data(1, "r-123", "我的页"),
new data(3, "r-4567", "搜索结果页")));




SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())
.map(new RichMapFunction() {

@Override
public String map(data data) throws Exception {
System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
return data.toString();
}
});


env.execute("test");

}
}
class data{
private int id;
private String goods;
private String pageName;

public data(int id, String goods, String pageName) {
this.id = id;
this.goods = goods;
this.pageName = pageName;
}


public data() {
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getGoods() {
return goods;
}

public void setGoods(String goods) {
this.goods = goods;
}

public String getPageName() {
return pageName;
}

public void setPageName(String pageName) {
this.pageName = pageName;
}

@Override
public String toString() {
return "data{" +
"id='" + id + '\'' +
", goods='" + goods + '\'' +
", pageName='" + pageName + '\'' +
'}';
}
}

class MyKeySelector implements KeySelector{

@Override
public Integer getKey(data data) throws Exception {
return data.getId();
}
}
控制台的输出如下:
可以看见数据根据id分组,分到了不同的subtask上。


二.使用String类型  代码如下:
public class KeyByTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
new data("1", "123", "分类页"),
new data("2", "r-123", "搜索结果页"),
new data("2", "r-123", "我的页"),
new data("3", "r-4567", "搜索结果页")));




SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())
.map(new RichMapFunction() {

@Override
public String map(data data) throws Exception {
System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
return data.toString();
}
});


env.execute("test");

}
}
class data{
private String id;
private String goods;
private String pageName;

public data(String id, String goods, String pageName) {
this.id = id;
this.goods = goods;
this.pageName = pageName;
}


public data() {
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getGoods() {
return goods;
}

public void setGoods(String goods) {
this.goods = goods;
}

public String getPageName() {
return pageName;
}

public void setPageName(String pageName) {
this.pageName = pageName;
}

@Override
public String toString() {
return "data{" +
"id='" + id + '\'' +
", goods='" + goods + '\'' +
", pageName='" + pageName + '\'' +
'}';
}
}

class MyKeySelector implements KeySelector{

@Override
public String getKey(data data) throws Exception {
return data.getId();
}
}
最终控制台输出如下:


可以看见只分了两个组,我不清楚这是否是一个bug.


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 21:35
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7...@163.com&qu

Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7...@163.com"  写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>>
>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>>
>>这样呢
>>
>>
>>lxk7...@163.com
>> 
>>From: Xuyang
>>Date: 2022-05-24 20:17
>>To: user-zh
>>Subject: Re:Re: Re: Some question with Flink state
>>Hi, 你的图还是挂了,可以使用图床工具试一下
>> 
>> 
>> 
>>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
>> 
>>图片好像有点问题,重新上传一下
>>lxk7...@163.com
>>From: Hangxiang Yu
>>Date: 2022-05-24 12:09
>>To: user-zh
>>Subject: Re: Re: Some question with Flink state
>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>>
>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>>
>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>>
>>>
>>>
>>> lxk7...@163.com
>>>
>>> From: Hangxiang Yu
>>> Date: 2022-05-23 23:09
>>> To: user-zh; lxk7491
>>> Subject: Re: Some question with Flink state
>>> Hello,
>>> All states will not be shared in different parallelisms.
>>> BTW, English questions could be sent to u...@flink.apache.org.
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>>
>>> >
>>> > Hi everyone
>>> >I was used Flink keyed-state in my Project.But I found some questions
>>> > that make me confused.
>>> >when I used value-state in multi parallelism  the value is not I
>>> wanted.
>>> >So I guess that value-state is in every parallelism. every parallelism
>>> > saved their only value  which means the value is Thread-Level
>>> >But when I used map-state,the value is correctly. I mean the map-state
>>> > was shared by every parallelism.
>>> >   looking forward to your reply
>>> >
>>> >
>>> > lxk7...@163.com
>>> >
>>>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?



lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
好的,我会尝试去弄一下。


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com

https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png

这样呢


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
 
图片好像有点问题,重新上传一下
lxk7...@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
[URL=https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png][IMG]https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png[/IMG][/URL]
[URL=https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png][IMG]https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png[/IMG][/URL]
看下这个是否能看见图片


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
 
图片好像有点问题,重新上传一下
lxk7...@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
Hi, 你的图还是挂了,可以使用图床工具试一下



在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:

图片好像有点问题,重新上传一下
lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>

Re: Re: Some question with Flink state

2022-05-23 文章 lxk7...@163.com
图片好像有点问题,重新上传一下


lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-23 文章 lxk7...@163.com
以下是我的代码部分




这是最新的一版,根据测试的时候没有啥问题
但是之前使用value state的时候能从数据上看出不对


lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-23 文章 Hangxiang Yu
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;

On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:

> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-23 文章 lxk7...@163.com
好的,我看这里面邮件都是英文,所以用英文问了个问题。
我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。



lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-23 23:09
To: user-zh; lxk7491
Subject: Re: Some question with Flink state
Hello,
All states will not be shared in different parallelisms.
BTW, English questions could be sent to u...@flink.apache.org.
 
Best,
Hangxiang.
 
On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
 
>
> Hi everyone
>I was used Flink keyed-state in my Project.But I found some questions
> that make me confused.
>when I used value-state in multi parallelism  the value is not I wanted.
>So I guess that value-state is in every parallelism. every parallelism
> saved their only value  which means the value is Thread-Level
>But when I used map-state,the value is correctly. I mean the map-state
> was shared by every parallelism.
>   looking forward to your reply
>
>
> lxk7...@163.com
>


Re: Some question with Flink state

2022-05-23 文章 Hangxiang Yu
Hello,
All states will not be shared in different parallelisms.
BTW, English questions could be sent to u...@flink.apache.org.

Best,
Hangxiang.

On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:

>
> Hi everyone
>I was used Flink keyed-state in my Project.But I found some questions
> that make me confused.
>when I used value-state in multi parallelism  the value is not I wanted.
>So I guess that value-state is in every parallelism. every parallelism
> saved their only value  which means the value is Thread-Level
>But when I used map-state,the value is correctly. I mean the map-state
> was shared by every parallelism.
>   looking forward to your reply
>
>
> lxk7...@163.com
>


Some question with Flink state

2022-05-23 文章 lxk7...@163.com

Hi everyone
   I was used Flink keyed-state in my Project.But I found some questions that 
make me confused.
   when I used value-state in multi parallelism  the value is not I wanted.
   So I guess that value-state is in every parallelism. every parallelism saved 
their only value  which means the value is Thread-Level 
   But when I used map-state,the value is correctly. I mean the map-state was 
shared by every parallelism.
  looking forward to your reply 


lxk7...@163.com


Re: Flink state evolution with avro

2021-06-17 文章 Yun Tang
Hi,

你可以参照社区的 state-evolution的 E2E 测试代码 [1], 整个程序就是使用的avro作为相关类的声明工具。


[1] 
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-state-evolution-test/src/main

祝好
唐云

From: casel.chen 
Sent: Friday, June 11, 2021 8:13
To: user-zh@flink.apache.org 
Subject: Flink state evolution with avro

Is there any live code example about flink state evolution with avro? Thanks!


Flink state evolution with avro

2021-06-10 文章 casel.chen
Is there any live code example about flink state evolution with avro? Thanks!

Flink state processor API with Avro data type

2021-05-31 文章 Min Tan
大家好,

我使用 Flink 1.10.1 并尝试使用 Flink State Processor API 从Savepoint读取 flink state 状态。
当状态Type 是普通 Java type或 Java POJOs时, 运行良好。

当 Avro 生成的 Java class 用作状态类型 state type时,不工作。
在这种Avro class情况下是否需要额外的序列化 serializers?

 
谢谢
谭民

????flink????state

2021-02-06 文章 ???????L
import org.apache.flink.api.java.functions.KeySelector; import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; 
import org.apache.flink.streaming.api.windowing.time.Time;   ... 
DataStream

回复: 根据业务需求选择合适的flink state

2021-01-25 文章 纪军伟
退订


| |
纪军伟
|
|
jjw8610...@163.com
|
签名由网易邮箱大师定制


在2021年01月23日 15:43,徐州州<25977...@qq.com> 写道:
我觉得你可以尝试一下TTL,keyby之后设置key状态的失效时间为1分钟,如果一分钟没数据进来就清空state。




--原始邮件--
发件人: "张锴"https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
 
  news_...@163.com 

Re: 根据业务需求选择合适的flink state

2021-01-23 文章 赵一旦
ATE_FORMAT)
> > >> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > >> DateUtil.SECOND_DATE_FORMAT)
> > >> val duration = (endTime - startTime) / 1000  //停留多少秒
> > >> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > >> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > >> join_time, leave_time, created_time, modified_time
> > >>   , liveType, plat_form, duration, duration_time,
> > >> network_operator, role, useragent, uid, eventTime))
> > >>
> > >> CloudliveWatcher(id, partnerId, courseId, customerId,
> > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > >> join_time, leave_time, created_time, modified_time
> > >>   , liveType, plat_form, duration, duration_time,
> > >> network_operator, role, useragent, uid, eventTime)
> > >>
> > >> }
> > >>
> > >>
> > >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> > >>
> > >>
> > >>
> > >>
> > >> 赵一旦  于2020年12月28日周一 下午7:12写道:
> > >>
> > >> > 按直播间ID和用户ID分组,使用session
> > >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> > >> >
> > >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> > >> >
> > >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> > >> >
> > >> >
> > >> > 张锴  于2020年12月28日周一 下午5:35写道:
> > >> >
> > >> > > 能描述一下用session window的考虑吗
> > >> > >
> > >> > > Akisaya  于2020年12月28日周一 下午5:00写道:
> > >> > >
> > >> > > > 这个可以用 session window 吧
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > >> > > >
> > >> > > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> > >> > > >
> > >> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > news_...@163.com
> > >> > > > >
> > >> > > > > 发件人: 张锴
> > >> > > > > 发送时间: 2020-12-28 13:35
> > >> > > > > 收件人: user-zh
> > >> > > > > 主题: 根据业务需求选择合适的flink state
> > >> > > > > 各位大佬帮我分析下如下需求应该怎么写
> > >> > > > >
> > >> > > > > 需求说明:
> > >> > > > >
> > >> > >
> > >>
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > >> > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > >> > > > >
> > >> > > > > 我的想法:
> > >> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
> > >> Time中的分钟数
> > >> > > > >
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > >> > > > >
> > >> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > >> > > > >
> > >> > > > > flink 版本1.10.1
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>


Re: 根据业务需求选择合适的flink state

2021-01-23 文章 张锴
[MinMaxTemp] = elements.iterator
if (iterator.hasNext) {
  val value: MinMaxTemp = iterator.next()
  id = value.id
  courseId= value.courseId
  partnerId = value.partnerId
  ip = value.ip
  customerId = value.customerId
  courseNumber = value.courseNumber
  nickName = value.nickName
  liveType = value.liveType
  uid = value.uid
  minTsState.update(value.mineventTime) //更新最小时间戳
  maxTsState.update(value.maxeventTime)  //更新最大时间戳
}
join_time = DateUtil.convertTimeStamp2DateStr(minTs,
DateUtil.SECOND_DATE_FORMAT)
leave_time = DateUtil.convertTimeStamp2DateStr(maxTs,
DateUtil.SECOND_DATE_FORMAT)
duration = (maxTs - minTs) / 1000  //停留多少秒
duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
minTsState.clear()
maxTsState.clear()

out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
  , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
  , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

  }
}





赵一旦  于2021年1月21日周四 下午8:38写道:

> 我表达的方法是按照session
> window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。
>
> 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。
>
>
> 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。
>
> 赵一旦  于2021年1月21日周四 下午8:28写道:
>
> > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。
> >
> > 张锴  于2021年1月21日周四 下午6:25写道:
> >
> >> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >>
> >>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> >> 下面是我的部分代码逻辑:
> >>
> >> val ds = dataStream
> >>   .filter(_.liveType == 1)
> >>   .keyBy(1, 2)
> >>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> >>   .process(new myProcessWindow()).uid("process-id")
> >>
> >> class myProcessWindow() extends
> >> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> >> TimeWindow] {
> >>
> >>   override def process(key: Tuple, context: Context, elements:
> >> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> >> = {
> >> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> >> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >>
> >> val currentDate = DateUtil.currentDate
> >> val created_time = currentDate
> >> val modified_time = currentDate
> >>  。。。
> >>
> >> val join_time: String =
> >> DateUtil.convertTimeStamp2DateStr(startTime,
> >> DateUtil.SECOND_DATE_FORMAT)
> >> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> >> DateUtil.SECOND_DATE_FORMAT)
> >> val duration = (endTime - startTime) / 1000  //停留多少秒
> >> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> >> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> >> join_time, leave_time, created_time, modified_time
> >>   , liveType, plat_form, duration, duration_time,
> >> network_operator, role, useragent, uid, eventTime))
> >>
> >> CloudliveWatcher(id, partnerId, courseId, customerId,
> >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> >> join_time, leave_time, created_time, modified_time
> >>   , liveType, plat_form, duration, duration_time,
> >> network_operator, role, useragent, uid, eventTime)
> >>
> >> }
> >>
> >>
> >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >>
> >>
> >>
> >>
> >> 赵一旦  于2020年12月28日周一 下午7:12写道:
> >>
> >> > 按直播间ID和用户ID分组,使用session
> >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >> >
> >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >> >
> >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >> >
> >> >
> >> > 张锴  于2020年12月28日周一 下午5:35写道:
> >> >
> >> > > 能描述一下用session window的考虑吗
> >> > >
> >> > > Akisaya  于2020年12月28日周一 下午

?????? ??????????????????????flink state

2021-01-23 文章 ??????



| |
??
|
|
jjw8610...@163.com
|
??


??2021??01??23?? 15:43<25977...@qq.com> ??
TTL??keybykey1state??




----
??: ""https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
 
  news_...@163.com 

?????? ??????????????????????flink state

2021-01-22 文章 ??????
TTL??keybykey1state??




----
??: ""https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
 
  news_...@163.com 

Re: 根据业务需求选择合适的flink state

2021-01-22 文章 张锴
@赵一旦
可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new
AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题

赵一旦  于2021年1月22日周五 上午10:10写道:

> 我理解你要的最终mysql结果表是:
> 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);
>
> 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
>
>
> 如上按照我的方案就可以实现哈。
>
> xuhaiLong  于2021年1月22日周五 上午10:03写道:
>
> > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
> 试试?
> >
> >
> > 在2021年1月21日 18:24,张锴 写道:
> > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > 下面是我的部分代码逻辑:
> >
> > val ds = dataStream
> > .filter(_.liveType == 1)
> > .keyBy(1, 2)
> > .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > .process(new myProcessWindow()).uid("process-id")
> >
> > class myProcessWindow() extends
> > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > TimeWindow] {
> >
> > override def process(key: Tuple, context: Context, elements:
> > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > = {
> > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >
> > val currentDate = DateUtil.currentDate
> > val created_time = currentDate
> > val modified_time = currentDate
> > 。。。
> >
> > val join_time: String =
> > DateUtil.convertTimeStamp2DateStr(startTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val duration = (endTime - startTime) / 1000  //停留多少秒
> > val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime))
> >
> > CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime)
> >
> > }
> >
> >
> > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >
> >
> >
> >
> > 赵一旦  于2020年12月28日周一 下午7:12写道:
> >
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴  于2020年12月28日周一 下午5:35写道:
> >
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > 这个可以用 session window 吧
> >
> >
> >
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> >
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> >
> >
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
> >
> >
> >
> >
>


Re: 根据业务需求选择合适的flink state

2021-01-21 文章 张锴
@赵一旦
我今天调整一下逻辑再试试

赵一旦  于2021年1月22日周五 上午10:10写道:

> 我理解你要的最终mysql结果表是:
> 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);
>
> 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
>
>
> 如上按照我的方案就可以实现哈。
>
> xuhaiLong  于2021年1月22日周五 上午10:03写道:
>
> > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
> 试试?
> >
> >
> > 在2021年1月21日 18:24,张锴 写道:
> > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > 下面是我的部分代码逻辑:
> >
> > val ds = dataStream
> > .filter(_.liveType == 1)
> > .keyBy(1, 2)
> > .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > .process(new myProcessWindow()).uid("process-id")
> >
> > class myProcessWindow() extends
> > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > TimeWindow] {
> >
> > override def process(key: Tuple, context: Context, elements:
> > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > = {
> > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >
> > val currentDate = DateUtil.currentDate
> > val created_time = currentDate
> > val modified_time = currentDate
> > 。。。
> >
> > val join_time: String =
> > DateUtil.convertTimeStamp2DateStr(startTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val duration = (endTime - startTime) / 1000  //停留多少秒
> > val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime))
> >
> > CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime)
> >
> > }
> >
> >
> > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >
> >
> >
> >
> > 赵一旦  于2020年12月28日周一 下午7:12写道:
> >
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴  于2020年12月28日周一 下午5:35写道:
> >
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > 这个可以用 session window 吧
> >
> >
> >
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> >
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> >
> >
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
> >
> >
> >
> >
>


Re: 根据业务需求选择合适的flink state

2021-01-21 文章 赵一旦
我理解你要的最终mysql结果表是:
直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);

如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。


如上按照我的方案就可以实现哈。

xuhaiLong  于2021年1月22日周五 上午10:03写道:

> 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试?
>
>
> 在2021年1月21日 18:24,张锴 写道:
> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> 下面是我的部分代码逻辑:
>
> val ds = dataStream
> .filter(_.liveType == 1)
> .keyBy(1, 2)
> .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> .process(new myProcessWindow()).uid("process-id")
>
> class myProcessWindow() extends
> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> TimeWindow] {
>
> override def process(key: Tuple, context: Context, elements:
> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> = {
> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>
> val currentDate = DateUtil.currentDate
> val created_time = currentDate
> val modified_time = currentDate
> 。。。
>
> val join_time: String =
> DateUtil.convertTimeStamp2DateStr(startTime,
> DateUtil.SECOND_DATE_FORMAT)
> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> DateUtil.SECOND_DATE_FORMAT)
> val duration = (endTime - startTime) / 1000  //停留多少秒
> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
> , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime))
>
> CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
> , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
> }
>
>
> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>
>
>
>
> 赵一旦  于2020年12月28日周一 下午7:12写道:
>
> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴  于2020年12月28日周一 下午5:35写道:
>
> 能描述一下用session window的考虑吗
>
> Akisaya  于2020年12月28日周一 下午5:00写道:
>
> 这个可以用 session window 吧
>
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_...@163.com  于2020年12月28日周一 下午2:15写道:
>
> 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>
>
>
> news_...@163.com
>
> 发件人: 张锴
> 发送时间: 2020-12-28 13:35
> 收件人: user-zh
> 主题: 根据业务需求选择合适的flink state
> 各位大佬帮我分析下如下需求应该怎么写
>
> 需求说明:
>
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>
>
>
>
>
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>
> 我的想法:
> 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>
> 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>
> flink 版本1.10.1
>
>
>
>
>


回复: 根据业务需求选择合适的flink state

2021-01-21 文章 xuhaiLong
可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 
userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试?


在2021年1月21日 18:24,张锴 写道:
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
.filter(_.liveType == 1)
.keyBy(1, 2)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
。。。

val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
val duration = (endTime - startTime) / 1000  //停留多少秒
val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦  于2020年12月28日周一 下午7:12写道:

按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。

或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。

session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。


张锴  于2020年12月28日周一 下午5:35写道:

能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

这个可以用 session window 吧



https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_...@163.com  于2020年12月28日周一 下午2:15写道:

这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。



news_...@163.com

发件人: 张锴
发送时间: 2020-12-28 13:35
收件人: user-zh
主题: 根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写

需求说明:

公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A




在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1






回复: 根据业务需求选择合适的flink state

2021-01-21 文章 xuhaiLong
Hi, 看了下你的代码,用session window 时长为1分钟,表示的是user1 
的窗口在1分钟内没收到数据就进行一个触发计算,所以最终得到的结果应该是需要你把 user1 产生的每条记录的时长做一个sum,如果只看单条维度是不全的
在2021年1月21日 18:24,张锴 写道:
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
.filter(_.liveType == 1)
.keyBy(1, 2)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
。。。

val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
val duration = (endTime - startTime) / 1000  //停留多少秒
val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦  于2020年12月28日周一 下午7:12写道:

按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。

或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。

session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。


张锴  于2020年12月28日周一 下午5:35写道:

能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

这个可以用 session window 吧



https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_...@163.com  于2020年12月28日周一 下午2:15写道:

这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。



news_...@163.com

发件人: 张锴
发送时间: 2020-12-28 13:35
收件人: user-zh
主题: 根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写

需求说明:

公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A




在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1






Re: 根据业务需求选择合适的flink state

2021-01-21 文章 赵一旦
我表达的方法是按照session
window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。

实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。

然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。

赵一旦  于2021年1月21日周四 下午8:28写道:

> 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。
>
> 张锴  于2021年1月21日周四 下午6:25写道:
>
>> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>>
>> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
>> 下面是我的部分代码逻辑:
>>
>> val ds = dataStream
>>   .filter(_.liveType == 1)
>>   .keyBy(1, 2)
>>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
>>   .process(new myProcessWindow()).uid("process-id")
>>
>> class myProcessWindow() extends
>> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
>> TimeWindow] {
>>
>>   override def process(key: Tuple, context: Context, elements:
>> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
>> = {
>> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
>> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>>
>> val currentDate = DateUtil.currentDate
>> val created_time = currentDate
>> val modified_time = currentDate
>>  。。。
>>
>> val join_time: String =
>> DateUtil.convertTimeStamp2DateStr(startTime,
>> DateUtil.SECOND_DATE_FORMAT)
>> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
>> DateUtil.SECOND_DATE_FORMAT)
>> val duration = (endTime - startTime) / 1000  //停留多少秒
>> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
>> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
>> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
>> join_time, leave_time, created_time, modified_time
>>   , liveType, plat_form, duration, duration_time,
>> network_operator, role, useragent, uid, eventTime))
>>
>> CloudliveWatcher(id, partnerId, courseId, customerId,
>> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
>> join_time, leave_time, created_time, modified_time
>>   , liveType, plat_form, duration, duration_time,
>> network_operator, role, useragent, uid, eventTime)
>>
>> }
>>
>>
>> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>>
>>
>>
>>
>> 赵一旦  于2020年12月28日周一 下午7:12写道:
>>
>> > 按直播间ID和用户ID分组,使用session
>> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>> >
>> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>> >
>> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>> >
>> >
>> > 张锴  于2020年12月28日周一 下午5:35写道:
>> >
>> > > 能描述一下用session window的考虑吗
>> > >
>> > > Akisaya  于2020年12月28日周一 下午5:00写道:
>> > >
>> > > > 这个可以用 session window 吧
>> > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>> > > >
>> > > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
>> > > >
>> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>> > > > >
>> > > > >
>> > > > >
>> > > > > news_...@163.com
>> > > > >
>> > > > > 发件人: 张锴
>> > > > > 发送时间: 2020-12-28 13:35
>> > > > > 收件人: user-zh
>> > > > > 主题: 根据业务需求选择合适的flink state
>> > > > > 各位大佬帮我分析下如下需求应该怎么写
>> > > > >
>> > > > > 需求说明:
>> > > > >
>> > >
>> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>> > > > >
>> > > > > 我的想法:
>> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
>> Time中的分钟数
>> > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>> > > > >
>> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>> > > > >
>> > > > > flink 版本1.10.1
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: 根据业务需求选择合适的flink state

2021-01-21 文章 赵一旦
我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。

张锴  于2021年1月21日周四 下午6:25写道:

> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> 下面是我的部分代码逻辑:
>
> val ds = dataStream
>   .filter(_.liveType == 1)
>   .keyBy(1, 2)
>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
>   .process(new myProcessWindow()).uid("process-id")
>
> class myProcessWindow() extends
> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> TimeWindow] {
>
>   override def process(key: Tuple, context: Context, elements:
> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> = {
> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
>
> val currentDate = DateUtil.currentDate
> val created_time = currentDate
> val modified_time = currentDate
>  。。。
>
> val join_time: String =
> DateUtil.convertTimeStamp2DateStr(startTime,
> DateUtil.SECOND_DATE_FORMAT)
> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> DateUtil.SECOND_DATE_FORMAT)
> val duration = (endTime - startTime) / 1000  //停留多少秒
> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>   , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime))
>
> CloudliveWatcher(id, partnerId, courseId, customerId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
>   , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
> }
>
>
> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>
>
>
>
> 赵一旦  于2020年12月28日周一 下午7:12写道:
>
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴  于2020年12月28日周一 下午5:35写道:
> >
> > > 能描述一下用session window的考虑吗
> > >
> > > Akisaya  于2020年12月28日周一 下午5:00写道:
> > >
> > > > 这个可以用 session window 吧
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > > >
> > > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> > > >
> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > > >
> > > > >
> > > > >
> > > > > news_...@163.com
> > > > >
> > > > > 发件人: 张锴
> > > > > 发送时间: 2020-12-28 13:35
> > > > > 收件人: user-zh
> > > > > 主题: 根据业务需求选择合适的flink state
> > > > > 各位大佬帮我分析下如下需求应该怎么写
> > > > >
> > > > > 需求说明:
> > > > >
> > >
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > > >
> > > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > > >
> > > > > 我的想法:
> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event
> Time中的分钟数
> > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > > >
> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > > >
> > > > > flink 版本1.10.1
> > > > >
> > > >
> > >
> >
>


Re: 根据业务需求选择合适的flink state

2021-01-21 文章 张锴
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
  .filter(_.liveType == 1)
  .keyBy(1, 2)
  .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
  .process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

  override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
 。。。

val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
val duration = (endTime - startTime) / 1000  //停留多少秒
val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
  , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
  , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦  于2020年12月28日周一 下午7:12写道:

> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴  于2020年12月28日周一 下午5:35写道:
>
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > > 这个可以用 session window 吧
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > >
> > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> > >
> > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > >
> > > >
> > > >
> > > > news_...@163.com
> > > >
> > > > 发件人: 张锴
> > > > 发送时间: 2020-12-28 13:35
> > > > 收件人: user-zh
> > > > 主题: 根据业务需求选择合适的flink state
> > > > 各位大佬帮我分析下如下需求应该怎么写
> > > >
> > > > 需求说明:
> > > >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > >
> > > > 我的想法:
> > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > >
> > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > >
> > > > flink 版本1.10.1
> > > >
> > >
> >
>


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 张锴
感谢你,稍后我会按这种思路试试

赵一旦  于2020年12月28日周一 下午7:12写道:

> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴  于2020年12月28日周一 下午5:35写道:
>
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > > 这个可以用 session window 吧
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > >
> > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> > >
> > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > >
> > > >
> > > >
> > > > news_...@163.com
> > > >
> > > > 发件人: 张锴
> > > > 发送时间: 2020-12-28 13:35
> > > > 收件人: user-zh
> > > > 主题: 根据业务需求选择合适的flink state
> > > > 各位大佬帮我分析下如下需求应该怎么写
> > > >
> > > > 需求说明:
> > > >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > >
> > > > 我的想法:
> > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > >
> > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > >
> > > > flink 版本1.10.1
> > > >
> > >
> >
>


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 赵一旦
按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。

或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。

session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。


张锴  于2020年12月28日周一 下午5:35写道:

> 能描述一下用session window的考虑吗
>
> Akisaya  于2020年12月28日周一 下午5:00写道:
>
> > 这个可以用 session window 吧
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> >
> > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > >
> > >
> > >
> > > news_...@163.com
> > >
> > > 发件人: 张锴
> > > 发送时间: 2020-12-28 13:35
> > > 收件人: user-zh
> > > 主题: 根据业务需求选择合适的flink state
> > > 各位大佬帮我分析下如下需求应该怎么写
> > >
> > > 需求说明:
> > >
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > >
> > > 我的想法:
> > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > >
> > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > >
> > > flink 版本1.10.1
> > >
> >
>


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 张锴
能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

> 这个可以用 session window 吧
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_...@163.com  于2020年12月28日周一 下午2:15写道:
>
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
>


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 张锴
能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

> 这个可以用 session window 吧
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_...@163.com  于2020年12月28日周一 下午2:15写道:
>
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
>


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 Akisaya
这个可以用 session window 吧
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_...@163.com  于2020年12月28日周一 下午2:15写道:

> 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>
>
>
> news_...@163.com
>
> 发件人: 张锴
> 发送时间: 2020-12-28 13:35
> 收件人: user-zh
> 主题: 根据业务需求选择合适的flink state
> 各位大佬帮我分析下如下需求应该怎么写
>
> 需求说明:
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>
> 我的想法:
> 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>
> 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>
> flink 版本1.10.1
>


Re: 根据业务需求选择合适的flink state

2020-12-27 文章 news_...@163.com
这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。



news_...@163.com
 
发件人: 张锴
发送时间: 2020-12-28 13:35
收件人: user-zh
主题: 根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写
 
需求说明:
公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
 
我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
 
不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
 
flink 版本1.10.1


根据业务需求选择合适的flink state

2020-12-27 文章 张锴
各位大佬帮我分析下如下需求应该怎么写

需求说明:
公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1


Re: Re: flink state ttl状态清理和重新计算的疑问

2020-08-18 文章 Benchao Li
1) 这个应该还是按照每个key来单独清理的
2)是按照最后的更新时间,也就是每次更新的时候超时时间重新开始计算

赵一旦  于2020年8月17日周一 上午11:51写道:

> @Li Benchao
> (1)
> 如果不是每个key一个timer,但超时仍然应该是key级别的吧,只是说清理机制上不是每个key设置一个timer去清理。
> 比如有个全局的其他机制定期扫描清理,但超时时间应该还是key级别。
>
> (2)
> 关于超时时间,对于一个key,他的value每次更新超时都会重新计算,还是永远按照这个key创建时时间开始计算呢。
>
> Benchao Li  于2020年8月15日周六 下午7:27写道:
>
> > 是按照每个key来清理的。清理时机是跟它最后的更新时间有关系,
> > 也就是在最后一次更新加上state retention时间这么长的时间后会清理。
> >
> > 最开始实现状态清理的时候,用的都是timer来清理,也就是每个key下都有自己的timer。
> > 现在是比较推荐使用state本身的TTL来做状态清理,并且用的是UpdateType.OnCreateAndWrite。
> > 不过现在还没有完全把每个算子和function都重构成这样子,所以还有些老的算子还是用的
> > timer来实现的。
> >
> > sunfulin  于2020年8月15日周六 下午6:12写道:
> >
> > >
> > >
> > >
> > > hi,
> > > 我的理解也是按每个key的时间来的,没仔细看具体实现。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-08-15 17:28:43,"art"  写道:
> > >
> > > The Idle State Retention Time parameters define for how long the state
> of
> > > a key is retained without being updated before it is removed.
> > > 我感觉我的理解错了,这个官方描述不是state of a key,
> 应该是每个key都有自己的过期时间吧,那么你那个状态不是应该以user登陆后
> > 开始
> > > 计时,不应该是作业启动吧,还望有个大佬可以解惑
> > >
> > >
> > >
> > > 在 2020年8月15日,下午3:06,sunfulin  写道:
> > >
> > >
> > > hi,
> > > 有可能这个是默认实现。我还发现另外一个问题,如果我不使用minibatch,发现作业的状态貌似不生效。导致输出了多条数据。不知道这是为何。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-08-15 13:30:24,"superainbower"  写道:
> > >
> > >
> 新手感觉应该是统一启动后满足TTL设置的时间就会全部清理,如果不这样,你每一个user的清理时间都不一样,那不得记录成百上千的user的更新时间
> > >
> > > 在2020年08月15日 13:15,sunfulin 写道:
> > > hi,community,
> > >
> >
> 想确认下idlestateretention的配置及生效机制,我有一个作业,设置了TTL为(10小时,10小时+5分钟)。假设我的作业是今天12:00启动,作业逻辑是统计当日10点后每个userId第一次登陆的时间:select
> > > userId, first_value(xxx) from source group by userId,
> > > date_format(eventtime,
> > > '-MM-dd')。那么我的作业的状态清理时机是从启动时间开始10小时之后么?还是会按照状态的数据更新的时间+10小时作为清理时间?
> > > 我使用flink 1.10.1版本,初步观察到的现象是,启动时间开始大概10小时后,状态开始清理。这个感觉不符合预期?求大佬帮忙确认下。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: Re: flink state ttl状态清理和重新计算的疑问

2020-08-16 文章 赵一旦
@Li Benchao
(1)
如果不是每个key一个timer,但超时仍然应该是key级别的吧,只是说清理机制上不是每个key设置一个timer去清理。
比如有个全局的其他机制定期扫描清理,但超时时间应该还是key级别。

(2)
关于超时时间,对于一个key,他的value每次更新超时都会重新计算,还是永远按照这个key创建时时间开始计算呢。

Benchao Li  于2020年8月15日周六 下午7:27写道:

> 是按照每个key来清理的。清理时机是跟它最后的更新时间有关系,
> 也就是在最后一次更新加上state retention时间这么长的时间后会清理。
>
> 最开始实现状态清理的时候,用的都是timer来清理,也就是每个key下都有自己的timer。
> 现在是比较推荐使用state本身的TTL来做状态清理,并且用的是UpdateType.OnCreateAndWrite。
> 不过现在还没有完全把每个算子和function都重构成这样子,所以还有些老的算子还是用的
> timer来实现的。
>
> sunfulin  于2020年8月15日周六 下午6:12写道:
>
> >
> >
> >
> > hi,
> > 我的理解也是按每个key的时间来的,没仔细看具体实现。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-08-15 17:28:43,"art"  写道:
> >
> > The Idle State Retention Time parameters define for how long the state of
> > a key is retained without being updated before it is removed.
> > 我感觉我的理解错了,这个官方描述不是state of a key, 应该是每个key都有自己的过期时间吧,那么你那个状态不是应该以user登陆后
> 开始
> > 计时,不应该是作业启动吧,还望有个大佬可以解惑
> >
> >
> >
> > 在 2020年8月15日,下午3:06,sunfulin  写道:
> >
> >
> > hi,
> > 有可能这个是默认实现。我还发现另外一个问题,如果我不使用minibatch,发现作业的状态貌似不生效。导致输出了多条数据。不知道这是为何。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-08-15 13:30:24,"superainbower"  写道:
> >
> > 新手感觉应该是统一启动后满足TTL设置的时间就会全部清理,如果不这样,你每一个user的清理时间都不一样,那不得记录成百上千的user的更新时间
> >
> > 在2020年08月15日 13:15,sunfulin 写道:
> > hi,community,
> >
> 想确认下idlestateretention的配置及生效机制,我有一个作业,设置了TTL为(10小时,10小时+5分钟)。假设我的作业是今天12:00启动,作业逻辑是统计当日10点后每个userId第一次登陆的时间:select
> > userId, first_value(xxx) from source group by userId,
> > date_format(eventtime,
> > '-MM-dd')。那么我的作业的状态清理时机是从启动时间开始10小时之后么?还是会按照状态的数据更新的时间+10小时作为清理时间?
> > 我使用flink 1.10.1版本,初步观察到的现象是,启动时间开始大概10小时后,状态开始清理。这个感觉不符合预期?求大佬帮忙确认下。
> >
> >
> >
> >
> >
> >
> >
>
> --
>
> Best,
> Benchao Li
>


Re: Re: flink state ttl状态清理和重新计算的疑问

2020-08-15 文章 Benchao Li
是按照每个key来清理的。清理时机是跟它最后的更新时间有关系,
也就是在最后一次更新加上state retention时间这么长的时间后会清理。

最开始实现状态清理的时候,用的都是timer来清理,也就是每个key下都有自己的timer。
现在是比较推荐使用state本身的TTL来做状态清理,并且用的是UpdateType.OnCreateAndWrite。
不过现在还没有完全把每个算子和function都重构成这样子,所以还有些老的算子还是用的
timer来实现的。

sunfulin  于2020年8月15日周六 下午6:12写道:

>
>
>
> hi,
> 我的理解也是按每个key的时间来的,没仔细看具体实现。
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-15 17:28:43,"art"  写道:
>
> The Idle State Retention Time parameters define for how long the state of
> a key is retained without being updated before it is removed.
> 我感觉我的理解错了,这个官方描述不是state of a key, 应该是每个key都有自己的过期时间吧,那么你那个状态不是应该以user登陆后 开始
> 计时,不应该是作业启动吧,还望有个大佬可以解惑
>
>
>
> 在 2020年8月15日,下午3:06,sunfulin  写道:
>
>
> hi,
> 有可能这个是默认实现。我还发现另外一个问题,如果我不使用minibatch,发现作业的状态貌似不生效。导致输出了多条数据。不知道这是为何。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-15 13:30:24,"superainbower"  写道:
>
> 新手感觉应该是统一启动后满足TTL设置的时间就会全部清理,如果不这样,你每一个user的清理时间都不一样,那不得记录成百上千的user的更新时间
>
> 在2020年08月15日 13:15,sunfulin 写道:
> hi,community,
> 想确认下idlestateretention的配置及生效机制,我有一个作业,设置了TTL为(10小时,10小时+5分钟)。假设我的作业是今天12:00启动,作业逻辑是统计当日10点后每个userId第一次登陆的时间:select
> userId, first_value(xxx) from source group by userId,
> date_format(eventtime,
> '-MM-dd')。那么我的作业的状态清理时机是从启动时间开始10小时之后么?还是会按照状态的数据更新的时间+10小时作为清理时间?
> 我使用flink 1.10.1版本,初步观察到的现象是,启动时间开始大概10小时后,状态开始清理。这个感觉不符合预期?求大佬帮忙确认下。
>
>
>
>
>
>
>

-- 

Best,
Benchao Li


flink state ttl状态清理和重新计算的疑问

2020-08-14 文章 sunfulin
hi,community,
想确认下idlestateretention的配置及生效机制,我有一个作业,设置了TTL为(10小时,10小时+5分钟)。假设我的作业是今天12:00启动,作业逻辑是统计当日10点后每个userId第一次登陆的时间:select
  userId, first_value(xxx) from source group by userId, date_format(eventtime, 
'-MM-dd')。那么我的作业的状态清理时机是从启动时间开始10小时之后么?还是会按照状态的数据更新的时间+10小时作为清理时间?
我使用flink 1.10.1版本,初步观察到的现象是,启动时间开始大概10小时后,状态开始清理。这个感觉不符合预期?求大佬帮忙确认下。

Re: flink state问题

2020-07-16 文章 Congxian Qiu
Hi

你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果
savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html

Best,
Congxian


op <520075...@qq.com> 于2020年7月16日周四 下午7:16写道:

> 大家好
> 我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
>  .keyBy(_._1).process(new
> KeyedProcessFunction[String,(String,String),String]() {
>   var state:ValueState[BloomFilter[CharSequence]]= null
>   override def open(parameters: Configuration): Unit = {
> val stateDesc = new
> ValueStateDescriptor("state",TypeInformation.of(new
> TypeHint[BloomFilter[CharSequence]](){}))
> state = getRuntimeContext.getState(stateDesc)
>   }
>   override def processElement(value: (String, String), ctx:
> KeyedProcessFunction[String, (String, String), String]#Context, out:
> Collector[String]) = {
>
> var filter = state.value
> if(filter==null){
>   println("null filter")
>   filter=
> BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,1,0.0001)}
> //val contains = filter.mightContain(value._2)
> if(!filter.mightContain(value._2)) {
>   filter.put(value._2)
>   state.update(filter)
>   out.collect(value._2)
>
> }
>
>   }
> })
> 通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊


flink state????

2020-07-16 文章 op
??
??bloomfilter
 .keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]() 
{
  var state:ValueState[BloomFilter[CharSequence]]= null
  override def open(parameters: Configuration): Unit = {
val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new 
TypeHint[BloomFilter[CharSequence]](){}))
state = getRuntimeContext.getState(stateDesc)
  }
  override def processElement(value: (String, String), ctx: 
KeyedProcessFunction[String, (String, String), String]#Context, out: 
Collector[String]) = {

var filter = state.value
if(filter==null){
  println("null filter")
  filter=  
BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,1,0.0001)}
//val contains = filter.mightContain(value._2)
if(!filter.mightContain(value._2)) {
  filter.put(value._2)
  state.update(filter)
  out.collect(value._2)

}

  }
})
??savepoint??state??bloomfilternull??

?????? ??????????: flink state

2020-07-16 文章 Robert.Zhang
??thanku all



----
??: "Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
 [2] https://cloud.tencent.com/developer/article/1509789
 Best,
 Congxian


 Robert.Zhang <173603...@qq.comgt; ??2020??7??13?? 9:50??

 gt; Hello,all
 gt; stream
 gt; state keyed streamglobal
 gt; 
parameter??statestream
 gt; operator??
 gt;
 gt;
 gt; Best regards


Re: 回复:答复: flink state

2020-07-15 文章 Congxian Qiu
Hi
broadcast state 是无法修改的,如果你还希望进行修改的话,可以使用 zhao liang 的方法,另外如果这个全局 state
不需要维护一致性等的话,同样可以考虑放到外存中(Redis,HBase 等)

Best,
Congxian


zhao liang  于2020年7月15日周三 下午6:05写道:

> Broadcast
> state是无法满足你的要求的,估计你只能像我这样把涉及的state数据融入到数据流中,在算子中针对不同的类型数据做区分了,等于人工维持这个broadcast的流的变化。
>
> 发件人: Robert.Zhang <173603...@qq.com>
> 日期: 星期三, 2020年7月15日 15:22
> 收件人: user-zh , user-zh@flink.apache.org <
> user-zh@flink.apache.org>
> 主题: 回复:答复: flink state
> 是这样的,问题在于我需要使用keyed state 来修改broadcast state,比如根据keyed
> state把某些满足条件的key存入这个broadcast state,并在其他算子计算的时候使用这个broadcast
> state,比如需要这些key来做
> 文档中提到的nonbroadcast side是无法修改broadcast state的,是read-only,
> 似乎无法直接去实现
>
>
>
>
>
> --原始邮件--
> 发件人: "zhao liang" 发送时间: 2020年7月14日(星期二) 下午4:09
> 收件人: "user-zh" 主题: 答复: flink state
>
>
>
> 我这边有个类似的实现,需要根据维表数据改变stream的处理,自定义了一个source(从MySQL中定时刷维表数据),kafka的stream
> union这个维表数据流,
> 额外增加一个数据类型(维表类型或者事实数据)进行数据的处理,后续算子将这个维表进行不同的处理并存到对应算子的state中。
>
> 发件人: Congxian Qiu  日期: 星期二, 2020年7月14日 14:03
> 收件人: user-zh  主题: Re: flink state
> Hi Robert
>
> Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
> [2] https://cloud.tencent.com/developer/article/1509789
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com 于2020年7月13日周一 下午9:50写道:
>
>  Hello,all
>  目前stream中遇到一个问题,
>  想使用一个全局的state 在所有的keyed stream中使用,或者global
>  parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
>  operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽
> 
> 
>  Best regards
>


答复: 回复:答复: flink state

2020-07-15 文章 zhao liang
Broadcast 
state是无法满足你的要求的,估计你只能像我这样把涉及的state数据融入到数据流中,在算子中针对不同的类型数据做区分了,等于人工维持这个broadcast的流的变化。

发件人: Robert.Zhang <173603...@qq.com>
日期: 星期三, 2020年7月15日 15:22
收件人: user-zh , user-zh@flink.apache.org 

主题: 回复:答复: flink state
是这样的,问题在于我需要使用keyed state 来修改broadcast state,比如根据keyed 
state把某些满足条件的key存入这个broadcast state,并在其他算子计算的时候使用这个broadcast state,比如需要这些key来做
文档中提到的nonbroadcast side是无法修改broadcast state的,是read-only,
似乎无法直接去实现





--原始邮件--
发件人: "zhao liang"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com 于2020年7月13日周一 下午9:50写道:

 Hello,all
 目前stream中遇到一个问题,
 想使用一个全局的state 在所有的keyed stream中使用,或者global
 parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
 operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽


 Best regards


??????????: flink state

2020-07-15 文章 Robert.Zhang
keyed state ??broadcast state??keyed 
statekeybroadcast 
statebroadcast state??key
nonbroadcast side??broadcast state??read-only,
??





----
??: "zhao liang"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com ??2020??7??13?? 9:50??

 Hello,all
 stream
 state keyed streamglobal
 
parameter??statestream
 operator??


 Best regards

答复: flink state

2020-07-14 文章 zhao liang
我这边有个类似的实现,需要根据维表数据改变stream的处理,自定义了一个source(从MySQL中定时刷维表数据),kafka的stream 
union这个维表数据流,
额外增加一个数据类型(维表类型或者事实数据)进行数据的处理,后续算子将这个维表进行不同的处理并存到对应算子的state中。

发件人: Congxian Qiu 
日期: 星期二, 2020年7月14日 14:03
收件人: user-zh 
主题: Re: flink state
Hi Robert

Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月13日周一 下午9:50写道:

> Hello,all
> 目前stream中遇到一个问题,
> 想使用一个全局的state 在所有的keyed stream中使用,或者global
> parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
> operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽
>
>
> Best regards


Re: flink state

2020-07-14 文章 Congxian Qiu
Hi Robert

Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月13日周一 下午9:50写道:

> Hello,all
> 目前stream中遇到一个问题,
> 想使用一个全局的state 在所有的keyed stream中使用,或者global
> parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
> operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽
>
>
> Best regards


flink state

2020-07-13 文章 Robert.Zhang
Hello,all
目前stream中遇到一个问题,
想使用一个全局的state 在所有的keyed stream中使用,或者global 
parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream 
operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽


Best regards

Re: flink state使用

2020-07-08 文章 Congxian Qiu
State 可以简单理解为一个 HashMap,Key 是 curretnly key(也就是 keyby 的 key)),value 是 state
存的值(可以是 value,list,map 等)

所有 state 的读写都有一个 currently,只能读到 currently key 对应的值。

在同一个 operator 中,同一个 key 能访问到之前存储过的 state 值,但是不能读取到其他 key 对应的值

Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月8日周三 下午3:58写道:

>
> 那就是在说,在iterativestream中,这个state可以正确的在每一次的迭代operator中传递并读取,那么结束迭代之后,假设得到的结果stream再进行keyby操作,因为operator不是同一个,此时就无法读取到前述迭代过程中的state,即便是同一个key
> 这样的理解对吗?
>
>
> Best Regards
>
>
>
>
>
> --原始邮件--
> 发件人: "Congxian Qiu" 发送时间: 2020年7月8日(星期三) 下午3:48
> 收件人: "user-zh" 主题: Re: flink state使用
>
>
>
> Hi
>
> KeyedState 的操作实际会针对当前的 key,也就是 keyBy 之后得到的 key,但是这个 key 用户看不到。在一个 operator
> 中 state 的数目是你创建的数目,但是每个 state 可以有多个 KV 对,其中 K 是 keyby 的 key,V 可以是
> value,list,map 等。同一个 operator 上不同的 key 的 state 保证能够正确读写的。
>
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com 于2020年7月8日周三 下午3:41写道:
>
>  Hello,all
>  目前遇到一个关于state的使用问题
>  nbsp; nbsp; nbsp; DataStream Stringgt;gt; result =
>  iteration.closeWith(
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;
>  iteration.keyBy(0).process(new RichMapFunc());
> 
> 
> 
>  我在richmapfunc里使用了state,那么每一次迭代的时候,这个state还是同一个吗?
>  如果我对result这个结果stream继续使用了keyby(0)的话,state也是同一个么?
>  此外,如果我对一个stream进行keyby之后,key一直是不变的,
> 
> 但是map等operator之后返回的并非是keyedstream(operator不对key进行操作),是否只能继续keyby,还是有更好的方式


?????? flink state????

2020-07-08 文章 Robert.Zhang
??iterativestreamstateoperator??stream??keyby??operatorstate??key



Best Regards





----
??: "Congxian Qiu"

Re: flink state使用

2020-07-08 文章 Congxian Qiu
Hi

KeyedState 的操作实际会针对当前的 key,也就是 keyBy 之后得到的 key,但是这个 key 用户看不到。在一个 operator
中 state 的数目是你创建的数目,但是每个 state 可以有多个 KV 对,其中 K 是 keyby 的 key,V 可以是
value,list,map 等。同一个 operator 上不同的 key 的 state 保证能够正确读写的。

Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月8日周三 下午3:41写道:

> Hello,all
> 目前遇到一个关于state的使用问题
>DataStream iteration.closeWith(
>
> iteration.keyBy(0).process(new RichMapFunc());
>
>
>
> 我在richmapfunc里使用了state,那么每一次迭代的时候,这个state还是同一个吗?
> 如果我对result这个结果stream继续使用了keyby(0)的话,state也是同一个么?
> 此外,如果我对一个stream进行keyby之后,key一直是不变的,
> 但是map等operator之后返回的并非是keyedstream(operator不对key进行操作),是否只能继续keyby,还是有更好的方式


flink state????

2020-07-08 文章 Robert.Zhang
Hello,all
state??
   DataStream

Re: Flink State 增加字段后 state 还能识别吗?

2020-06-14 文章 Congxian Qiu
Hi

Flink 支持 state 的 schema evolution 的,具体的文档可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/schema_evolution.html
Best,
Congxian


1048262223 <1048262...@qq.com> 于2020年6月9日周二 下午6:30写道:

> Hi
>
>
> 我在官网找到了文档或许可以解答,参考[1]
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/schema_evolution.html#pojo-%E7%B1%BB%E5%9E%8B
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"wangl...@geekplus.com.cn" 发送时间:2020年6月9日(星期二) 晚上6:03
> 收件人:"user-zh"
> 主题:Flink State 增加字段后 state 还能识别吗?
>
>
>
>
> 写了个简单的类会在 Flink State 中使用:
>
> public class OrderState {
>  private Integer warehouseId;
>  private String orderNo;
>  private String ownerCode;
>  private Long inputDate;
>  private int orderType;
>  private int amount = 0;
>  private int status = 0;
> .
> }
>
>
> 现在程序要升级,这个类还要增加一个新的字段。从state 能正常恢复吗?
> 也就是 flink run -s savepointdir 后能正常识别旧的代码保存的 state
> 吗?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn


??????Flink State ?????????? state ????????????

2020-06-09 文章 1048262223
Hi


[1]


[1]https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/schema_evolution.html#pojo-%E7%B1%BB%E5%9E%8B


Best,
Yichao Yang




----
??:"wangl...@geekplus.com.cn"

?????? ??????Flink State ?????????? state ????????????

2020-06-09 文章 1048262223
Hi


TypeInformation a = 
TypeInformation.of(OrderState.class);??pojopojo??


Best,
Yichao Yang





----
??:"wangl...@geekplus.com.cn"

????: ??????Flink State ?????????? state ????????????

2020-06-09 文章 wangl...@geekplus.com.cn

OrderState 
 get set ?? 

 flink ??OrderState??



wang...@geekplus.com.cn

 1048262223
?? 2020-06-09 18:11
 user-zh
?? ??Flink State ?? state 
Hi
 
 
flink??OrderStatepojo??savepoint??
 
Best,
Yichao Yang
 
 
 
 
----
??:"wangl...@geekplus.com.cn"

??????Flink State ?????????? state ????????????

2020-06-09 文章 1048262223
Hi


flink??OrderStatepojo??savepoint??

Best,
Yichao Yang




----
??:"wangl...@geekplus.com.cn"

Flink State 增加字段后 state 还能识别吗?

2020-06-09 文章 wangl...@geekplus.com.cn

写了个简单的类会在 Flink State 中使用:

public class OrderState {
private Integer warehouseId;
private String orderNo;
private String ownerCode;
private Long inputDate;
private int orderType;
private int amount = 0;
private int status = 0;
.
}


现在程序要升级,这个类还要增加一个新的字段。从state 能正常恢复吗?
也就是 flink run -s   savepointdir   后能正常识别旧的代码保存的 state 吗?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: 关于flink state的问题

2020-04-10 文章 Congxian Qiu
你好:

1 state 如果不需要了,可以自己删除(如果是 Window 中使用的,可以进行配置在 Window 结束时删除)
2 可以使用 TTL State[1]
3 如果仅考虑 OOM 风险,可以考虑使用 RocksDBStateBackend

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
Best,
Congxian


guanyq  于2020年4月11日周六 上午7:21写道:

> 您好:
>
> 1.随着程序的运行,task内存中的状态会不断增加,迟早会出现内存溢出问题,想知道一般都如何解决这个问题?
>
>


关于flink state的问题

2020-04-10 文章 guanyq
您好:

1.随着程序的运行,task内存中的状态会不断增加,迟早会出现内存溢出问题,想知道一般都如何解决这个问题?

 

Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 文章 LakeShen
Ok, got it ,thank you

Zhu Zhu  于2020年1月6日周一 上午10:30写道:

> Yes. State TTL is by default disabled.
>
> Thanks,
> Zhu Zhu
>
> LakeShen  于2020年1月6日周一 上午10:09写道:
>
>> I saw the flink source code, I find the flink state ttl default is
>> never expire,is it right?
>>
>> LakeShen  于2020年1月6日周一 上午9:58写道:
>>
>>> Hi community,I have a question about flink state ttl.If I don't config
>>> the flink state ttl config,
>>> How long the flink state retain?Is it forever retain in hdfs?
>>> Thanks your replay.
>>>
>>


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 文章 Zhu Zhu
Yes. State TTL is by default disabled.

Thanks,
Zhu Zhu

LakeShen  于2020年1月6日周一 上午10:09写道:

> I saw the flink source code, I find the flink state ttl default is
> never expire,is it right?
>
> LakeShen  于2020年1月6日周一 上午9:58写道:
>
>> Hi community,I have a question about flink state ttl.If I don't config
>> the flink state ttl config,
>> How long the flink state retain?Is it forever retain in hdfs?
>> Thanks your replay.
>>
>


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 文章 LakeShen
I saw the flink source code, I find the flink state ttl default is
never expire,is it right?

LakeShen  于2020年1月6日周一 上午9:58写道:

> Hi community,I have a question about flink state ttl.If I don't config the
> flink state ttl config,
> How long the flink state retain?Is it forever retain in hdfs?
> Thanks your replay.
>


How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 文章 LakeShen
Hi community,I have a question about flink state ttl.If I don't config the
flink state ttl config,
How long the flink state retain?Is it forever retain in hdfs?
Thanks your replay.


Re: Flink State 过期清除 TTL 问题

2019-12-11 文章 陈帅
我们也遇到过类似的问题,有可能是进来的数据量带来的状态增长速度大于状态过期清理速度。另外想问一下有没有metrics监控到每次清理过期状态的大小和时间?

Yun Tang  于2019年12月10日周二 下午8:30写道:

> Hi 王磊
>
> Savepoint目录中的数据的时间戳不会在恢复的时候再更新为当前时间,仍然为之前的时间,从代码上看如果你配置了cleanupFullSnapshot就会生效的,另外配置
> cleanupInRocksdbCompactFilter
> 能让过期清理检查在后台执行,据我所知这个功能是可靠的,有尝试过长时间观察么,另外你们的新增数据量是恒定的么?
>
> 祝好
> 唐云
>
> On 12/10/19, 10:16 AM, "wangl...@geekplus.com.cn" <
> wangl...@geekplus.com.cn> wrote:
>
> Hi 唐云,
>
> 我的集群已经升到了 1.8.2,  cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter
> 都试验了下。
> 但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的:
>
> cancel -s 停止,savepoint 目录大小为 100M
> 代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot
> 新的代码从 1 的 savepoint 目录恢复
> 新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大
>
> 会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Yun Tang
> Send Time: 2019-11-01 01:38
> Receiver: user-zh@flink.apache.org
> Subject: Re: Flink State 过期清除 TTL 问题
> Hi 王磊
>
> 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置
> cleanupFullSnapshot,这样你在执行full
> snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
>
> 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
> [2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
>
> 祝好
> 唐云
>
>
> On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn" <
> wangl...@geekplus.com.cn> wrote:
>
> flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig
>
> .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
> ValueStateDescriptor descriptor = new
> ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
> descriptor.enableTimeToLive(ttlConfig);
>
> 程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从
> savepoint 目录恢复。
> 我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint
> 目录不断变大。是过期清除策略没生效吗?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
>
>
>


Re: Flink State 过期清除 TTL 问题

2019-12-10 文章 Yun Tang
Hi 王磊

Savepoint目录中的数据的时间戳不会在恢复的时候再更新为当前时间,仍然为之前的时间,从代码上看如果你配置了cleanupFullSnapshot就会生效的,另外配置
 cleanupInRocksdbCompactFilter 
能让过期清理检查在后台执行,据我所知这个功能是可靠的,有尝试过长时间观察么,另外你们的新增数据量是恒定的么?

祝好
唐云

On 12/10/19, 10:16 AM, "wangl...@geekplus.com.cn"  
wrote:

Hi 唐云,

我的集群已经升到了 1.8.2,  cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter 都试验了下。
但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的:

cancel -s 停止,savepoint 目录大小为 100M
代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot 
新的代码从 1 的 savepoint 目录恢复
新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大

会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间? 

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Yun Tang
Send Time: 2019-11-01 01:38
Receiver: user-zh@flink.apache.org
Subject: Re: Flink State 过期清除 TTL 问题
Hi 王磊
 
从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
 
另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
 
祝好
唐云
 
 
On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn" 
 wrote:
 
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 
目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 
目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn

 




Re: Re: Flink State 过期清除 TTL 问题

2019-12-09 文章 wangl...@geekplus.com.cn
Hi 唐云,

我的集群已经升到了 1.8.2,  cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter 都试验了下。
但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的:

cancel -s 停止,savepoint 目录大小为 100M
代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot 
新的代码从 1 的 savepoint 目录恢复
新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大

会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间? 

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Yun Tang
Send Time: 2019-11-01 01:38
Receiver: user-zh@flink.apache.org
Subject: Re: Flink State 过期清除 TTL 问题
Hi 王磊
 
从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
 
另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
 
祝好
唐云
 
 
On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:
 
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn

 


Re: How to estimate the memory size of flink state

2019-11-20 文章 刘建刚
  Thank you. Your suggestion is good and I benefit a lot. For my case, I 
want to know the state memory size for other reasons. 
  When the the gc pressure is bigger, I need to limit the source or discard 
some data from the source to ensure job’s running. If the state size is bigger, 
I need to discard data. If the state size is not bigger, I need to limit the 
source.  The state size shows the resident memory. For event time, discarding 
data can reduce memory usage.
  Could you please give me some suggestions? 

> 在 2019年11月20日,下午3:16,sysukelee  写道:
> 
> Hi Liu,
> We monitor the jvm used/max heap memory to determine whether to rescale the 
> job.
> To avoid problems caused by oom, you don't need to know exactly how much 
> memory exactly used by state. 
> Focusing on jvm memory use is more reasonable.
>  
> 
> On 11/20/2019 15:08,刘建刚 
>  wrote: 
> We are using flink 1.6.2. For filesystem backend, we want to monitor
> the state size in memory. Once the state size becomes bigger, we can get
> noticed and take measures such as rescaling the job, or the job may fail
> because of the memory.
> We have tried to get the memory usage for the jvm, like gc throughput.
> For our case, state can vary greatly at the peak. So maybe I can refer to
> the state memory size.
> I checked the metrics and code, but didn't find any information about
> the state memory size. I can get the checkpoint size, but they are
> serialized result that can not reflect the running state in memory.  Can
> anyone give me some suggestions? Thank you very much.



How to estimate the memory size of flink state

2019-11-19 文章 刘建刚
  We are using flink 1.6.2. For filesystem backend, we want to monitor
the state size in memory. Once the state size becomes bigger, we can get
noticed and take measures such as rescaling the job, or the job may fail
because of the memory.
  We have tried to get the memory usage for the jvm, like gc throughput.
For our case, state can vary greatly at the peak. So maybe I can refer to
the state memory size.
  I checked the metrics and code, but didn't find any information about
the state memory size. I can get the checkpoint size, but they are
serialized result that can not reflect the running state in memory.  Can
anyone give me some suggestions? Thank you very much.


Re: Re: Flink State 过期清除 TTL 问题

2019-10-31 文章 wangl...@geekplus.com.cn
谢谢,了解了。

王磊



wangl...@geekplus.com.cn
 
Sender: Yun Tang
Send Time: 2019-11-01 01:38
Receiver: user-zh@flink.apache.org
Subject: Re: Flink State 过期清除 TTL 问题
Hi 王磊
 
从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
 
另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
 
祝好
唐云
 
 
On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:
 
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn

 


Re: Flink State 过期清除 TTL 问题

2019-10-31 文章 Yun Tang
Hi 王磊

从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。

另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv

祝好
唐云


On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:

flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn




Flink State 过期清除 TTL 问题

2019-10-30 文章 wangl...@geekplus.com.cn
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn