你好,我有一个keyed 
state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。


class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {

    /**
     * The ValueState handle. The first field is the count, the second field a 
running sum.
     */
    private transient MapState<String, Long> stat_value;

    @Override
    public void flatMap(ObjectNode input, Collector<JSONObject> out) throws 
Exception {

        // access the state value

    }

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Long> descriptor =
                new MapStateDescriptor<String, Long>(
                        "stat_value",String.class, Long.class); // default 
value of the state, if nothing was set
        stat_value = getRuntimeContext().getMapState(descriptor);
    }
}

回复