????????????????????????????????????quable 
state????????????????????????????????checkpoint??????????????????????????????????????????????????api????statetable????????kv????

---????????---
??????:&quot;Yun Tang&quot;<[email protected]&gt;
????????:2021??4??10?? ?????? ????9:50
??????:&quot;user-zh&quot;<[email protected]&gt;
????:Re: CheckpointedFunction#snapshotState????????????????
Hi
&nbsp;&nbsp;snapshotState????????operator&nbsp;state????????????????keyed&nbsp;state&nbsp;??????????????currentKey????????currentKey????????????????record??key????snapshotState????????????????????????????snapshotState??????????????????????????record????
&nbsp;&nbsp;????????????????keyed&nbsp;state??????????&nbsp;KeyedStateBackend#getKeys(String&nbsp;state,&nbsp;N&nbsp;namespace)&nbsp;??????????????????????keyed&nbsp;state??????HBase??????Flink??????????????per&nbsp;record??????????????????????????????????????????????
????
????
________________________________
From:&nbsp;cs&nbsp;<[email protected]&gt;
Sent:&nbsp;Tuesday,&nbsp;April&nbsp;6,&nbsp;2021&nbsp;21:52
To:&nbsp;user-zh&nbsp;<[email protected]&gt;
Subject:&nbsp;CheckpointedFunction#snapshotState????????????????
class&nbsp;A&nbsp;extends&nbsp;KeyedProcessFunction<String,&nbsp;Object,&nbsp;String&amp;gt;&nbsp;implements&nbsp;CheckpointedFunction&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;MapState<String,&nbsp;LiveBean&amp;gt;&nbsp;liveBeanState;
&amp;nbsp;&nbsp;&nbsp;@Override
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;public&nbsp;void&nbsp;initializeState(FunctionInitializationContext&nbsp;context)&nbsp;throws&nbsp;Exception&nbsp;{
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;MapStateDescriptor<String,&nbsp;LiveBean&amp;gt;&nbsp;descriptor&nbsp;=&nbsp;new&nbsp;MapStateDescriptor<&amp;gt;(&quot;liveState&quot;,&nbsp;BasicTypeInfo.STRING_TYPE_INFO,&nbsp;TypeInformation.of(LiveBean.class));
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;this.liveBeanState&nbsp;=&nbsp;context.getKeyedStateStore().getMapState(descriptor);
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;}
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;@Override
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;public&nbsp;void&nbsp;snapshotState(FunctionSnapshotContext&nbsp;context)&nbsp;throws&nbsp;Exception&nbsp;{
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;Iterator<Map.Entry<String,&nbsp;LiveBean&amp;gt;&amp;gt;&nbsp;iter&nbsp;=&nbsp;liveBeanState.iterator();&nbsp;//&nbsp;-&amp;gt;&nbsp;Exception&nbsp;here
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;while&nbsp;(iter.hasNext())&nbsp;{
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;Map.Entry<String,&nbsp;LiveBean&amp;gt;&nbsp;e&nbsp;=&nbsp;iter.next();
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;hbaseDao.singlePut(&quot;table&quot;,&nbsp;StringUtils.reverse(e.getKey()),&nbsp;&quot;cf&quot;,&nbsp;&quot;info&quot;,&nbsp;JSON.toJSONString(e.getValue()));
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;}
&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;}
}
????????????
java.lang.NullPointerException:&nbsp;No&nbsp;key&nbsp;set.&nbsp;This&nbsp;method&nbsp;should&nbsp;not&nbsp;be&nbsp;called&nbsp;outside&nbsp;of&nbsp;a&nbsp;keyed&nbsp;context.
??????checkpoint????????????????????hbase??????????????

回复