CheckpointedFunction 接口的 initializeState 方法提供了访问 FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问 OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过 CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState: ```java context.getKeyedStateStore().getState(stateDescriptor); ``` 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗? ```java public static class TemperatureAlertFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Double>, Tuple3<String, Double, Double>> implements CheckpointedFunction { // 温度差报警阈值 private double threshold; // 上一次温度 private ValueState<Double> lastTemperatureState; private Double lastTemperature; public TemperatureAlertFlatMapFunction(double threshold) { this.threshold = threshold; }
@Override public void flatMap(Tuple2<String, Double> sensor, Collector<Tuple3<String, Double, Double>> out) throws Exception { String sensorId = sensor.f0; // 当前温度 double temperature = sensor.f1; // 保存当前温度 lastTemperature = temperature; // 是否是第一次上报的温度 if (Objects.equals(lastTemperature, null)) { return; } double diff = Math.abs(temperature - lastTemperature); if (diff > threshold) { // 温度变化超过阈值则输出 out.collect(Tuple3.of(sensorId, temperature, diff)); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // 获取最新的温度之后更新保存上一次温度的状态 if (!Objects.equals(lastTemperature, null)) { lastTemperatureState.update(lastTemperature); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ValueStateDescriptor<Double> stateDescriptor = new ValueStateDescriptor<>("lastTemperature", Double.class); lastTemperatureState = context.getKeyedStateStore().getState(stateDescriptor); if (context.isRestored()) { lastTemperature = lastTemperatureState.value(); } } } ```