????????????????????????????????????quable state????????????????????????????????checkpoint??????????????????????????????????????????????????api????statetable????????kv????
---????????--- ??????:"Yun Tang"<[email protected]> ????????:2021??4??10?? ?????? ????9:50 ??????:"user-zh"<[email protected]> ????:Re: CheckpointedFunction#snapshotState???????????????? Hi snapshotState????????operator state????????????????keyed state ??????????????currentKey????????currentKey????????????????record??key????snapshotState????????????????????????????snapshotState??????????????????????????record???? ????????????????keyed state?????????? KeyedStateBackend#getKeys(String state, N namespace) ??????????????????????keyed state??????HBase??????Flink??????????????per record?????????????????????????????????????????????? ???? ???? ________________________________ From: cs <[email protected]> Sent: Tuesday, April 6, 2021 21:52 To: user-zh <[email protected]> Subject: CheckpointedFunction#snapshotState???????????????? class A extends KeyedProcessFunction<String, Object, String&gt; implements CheckpointedFunction { private MapState<String, LiveBean&gt; liveBeanState; &nbsp; @Override &nbsp; &nbsp; public void initializeState(FunctionInitializationContext context) throws Exception { &nbsp; &nbsp; &nbsp; &nbsp; MapStateDescriptor<String, LiveBean&gt; descriptor = new MapStateDescriptor<&gt;("liveState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(LiveBean.class)); &nbsp; &nbsp; &nbsp; &nbsp; this.liveBeanState = context.getKeyedStateStore().getMapState(descriptor); &nbsp; &nbsp; } &nbsp; &nbsp; &nbsp;@Override &nbsp; &nbsp; public void snapshotState(FunctionSnapshotContext context) throws Exception { &nbsp; &nbsp; &nbsp; &nbsp; Iterator<Map.Entry<String, LiveBean&gt;&gt; iter = liveBeanState.iterator(); // -&gt; Exception here &nbsp; &nbsp; &nbsp; &nbsp; while (iter.hasNext()) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Map.Entry<String, LiveBean&gt; e = iter.next(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; hbaseDao.singlePut("table", StringUtils.reverse(e.getKey()), "cf", "info", JSON.toJSONString(e.getValue())); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; } } ???????????? java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context. ??????checkpoint????????????????????hbase??????????????
