自定义UDF 实现CheckpointedFunction 
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements 
CheckpointedFunction {
private static final Logger                            LOGGER = 
LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient     MapState<String,Integer >          mapState;
private               MapStateDescriptor<String,Integer> mapStateDescriptor;
   。。。。。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {


LOGGER.info("the snapshotState    is  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeState    is  started ");




}





| |
阿华田
|
|
[email protected]
|
签名由网易邮箱大师定制

回复