你好,我有一个keyed
state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
/**
* The ValueState handle. The first field is the count, the second field a
running sum.
*/
private transient MapState<String, Long> stat_value;
@Override
public void flatMap(ObjectNode input, Collector<JSONObject> out) throws
Exception {
// access the state value
}
@Override
public void open(Configuration config) {
MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<String, Long>(
"stat_value",String.class, Long.class); // default
value of the state, if nothing was set
stat_value = getRuntimeContext().getMapState(descriptor);
}
}