Hi

  snapshotState主要是给operator state用的,异常原因是keyed state 
访问时需要设置currentKey的,但是currentKey是当前正在处理的record的key,与snapshotState的执行时候的语义不一样,执行snapshotState方法的时候,是可以没有当前record的。

  如果想要访问整个keyed state,可以通过 KeyedStateBackend#getKeys(String state, N namespace) 
来访问,但还是不建议将keyed state写入到HBase,因为Flink更希望你是按照per record的访问,而不是全局访问,后者效率和性能都不好。


祝好

唐云
________________________________
From: cs <58683...@qq.com>
Sent: Tuesday, April 6, 2021 21:52
To: user-zh <user-zh@flink.apache.org>
Subject: CheckpointedFunction#snapshotState访问键控状态报错

class A extends KeyedProcessFunction<String, Object, String&gt; implements 
CheckpointedFunction {


        private MapState<String, LiveBean&gt; liveBeanState;

&nbsp;  @Override
&nbsp; &nbsp; public void initializeState(FunctionInitializationContext 
context) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; MapStateDescriptor<String, LiveBean&gt; descriptor 
= new MapStateDescriptor<&gt;("liveState", BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(LiveBean.class));
&nbsp; &nbsp; &nbsp; &nbsp; this.liveBeanState = 
context.getKeyedStateStore().getMapState(descriptor);
&nbsp; &nbsp; }


&nbsp; &nbsp; &nbsp;@Override
&nbsp; &nbsp; public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
&nbsp; &nbsp; &nbsp; &nbsp; Iterator<Map.Entry<String, LiveBean&gt;&gt; iter = 
liveBeanState.iterator(); // -&gt; Exception here
&nbsp; &nbsp; &nbsp; &nbsp; while (iter.hasNext()) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Map.Entry<String, LiveBean&gt; e = 
iter.next();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; hbaseDao.singlePut("table", 
StringUtils.reverse(e.getKey()), "cf", "info", JSON.toJSONString(e.getValue()));
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }


}



异常信息如下
java.lang.NullPointerException: No key set. This method should not be called 
outside of a keyed context.
需要在checkpoint的时候将键控状态存入hbase,该如何处理呢

回复