这种情况我需要改flink源码吗,还是自己实现一个自定义的state类就好了,还有在这个state类中怎么能获取到key呢

> 在 2019年4月17日,上午11:24,wenlong.lwl <[email protected]> 写道:
> 
> 可以封装一下state 的访问,从state get不到数据的时候,去数据库里取下,更新到state里
> 
> On Tue, 16 Apr 2019 at 20:53, zhang yue <[email protected]> wrote:
> 
>> 是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink state
>> 那现在对于这种场景有什么替代方案吗
>> 
>>> 在 2019年4月16日,下午8:33,Congxian Qiu <[email protected]> 写道:
>>> 
>>> Hi
>>> 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer
>> 相关的事情,到时候就可以了
>>> 
>>> Best, Congxian
>>> On Apr 16, 2019, 20:27 +0800, zhang yue <[email protected]>, wrote:
>>>> 你好,我有一个keyed
>> state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
>>>> 
>>>> 
>>>> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
>>>> 
>>>> /**
>>>> * The ValueState handle. The first field is the count, the second field
>> a running sum.
>>>> */
>>>> private transient MapState<String, Long> stat_value;
>>>> 
>>>> @Override
>>>> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws
>> Exception {
>>>> 
>>>> // access the state value
>>>> 
>>>> }
>>>> 
>>>> @Override
>>>> public void open(Configuration config) {
>>>> MapStateDescriptor<String, Long> descriptor =
>>>> new MapStateDescriptor<String, Long>(
>>>> "stat_value",String.class, Long.class); // default value of the state,
>> if nothing was set
>>>> stat_value = getRuntimeContext().getMapState(descriptor);
>>>> }
>>>> }
>>>> 
>> 
>> 

回复