好的,感谢
在 2023-05-15 15:49:12,"Hangxiang Yu" <master...@gmail.com> 写道:
>Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论:
>https://issues.apache.org/jira/browse/FLINK-13721
>方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue.
>我这边也会帮忙一起看下
>
>On Mon, May 15, 2023 at 1:41 PM lxk <lxk7...@163.com> wrote:
>
>> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。
>> 或者使用广播流的时候有没有什么能够手动清理状态的方法?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-05-15 11:28:54,"Hangxiang Yu" <master...@gmail.com> 写道:
>> >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
>> ><
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
>> >对
>> >State TTL 的描述;
>> >
>> >On Mon, May 15, 2023 at 11:05 AM lxk <lxk7...@163.com> wrote:
>> >
>> >> flink版本:1.14
>> >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
>> >> 在主程序中,我设置了状态过期策略:
>> >> SingleOutputStreamOperator<AdvertiseClick> baiduStream =
>> >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
>> >> AdvertiseClick.class)).name("BaiDuAdClick");
>> >> MapStateDescriptor<String, AdvertiseClick> baiduInfoMap = new
>> >> MapStateDescriptor<>("advertiseInfo", String.class,
>> AdvertiseClick.class);
>> >> StateTtlConfig ttlConfig = StateTtlConfig
>> >> .newBuilder(Time.days(7))
>> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> >> .cleanupFullSnapshot()
>> >> .cleanupIncrementally(200, true)
>> >> .build();
>> >> baiduInfoMap.enableTimeToLive(ttlConfig);
>> >> 在BroadcastProcessFunction中,我也设置了状态清除策略:
>> >> public void open(Configuration parameters) throws Exception {
>> >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
>> >> baiduInfoDesc = new MapStateDescriptor<String,
>> >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
>> >> StateTtlConfig ttlConfig = StateTtlConfig
>> >> .newBuilder(Time.days(7))
>> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> >> .cleanupFullSnapshot()
>> >> .cleanupIncrementally(200, true)
>> >> .build();
>> >> baiduInfoDesc.enableTimeToLive(ttlConfig);
>> >>
>> >> }
>> >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
>> >>
>> >>
>> >> https://pic2.imgdb.cn/item/64619fef0d2dde577774d4c6.jpg
>> >>
>> >>
>> >>
>> >>
>> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。
>> >
>> >
>> >
>> >--
>> >Best,
>> >Hangxiang.
>>
>
>
>--
>Best,
>Hangxiang.