Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和 snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里; 原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的; 而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做;
On Fri, May 5, 2023 at 10:58 PM sjf0115 <si_ji_f...@163.com> wrote: > CheckpointedFunction 接口的 initializeState 方法提供了访问 > FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问 > OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过 > CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState: > ```java > context.getKeyedStateStore().getState(stateDescriptor); > ``` > 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗? > ```java > public static class TemperatureAlertFlatMapFunction extends > RichFlatMapFunction<Tuple2<String, Double>, Tuple3<String, Double, Double>> > implements CheckpointedFunction { > // 温度差报警阈值 > private double threshold; > // 上一次温度 > private ValueState<Double> lastTemperatureState; > private Double lastTemperature; > public TemperatureAlertFlatMapFunction(double threshold) { > this.threshold = threshold; > } > > > @Override > public void flatMap(Tuple2<String, Double> sensor, > Collector<Tuple3<String, Double, Double>> out) throws Exception { > String sensorId = sensor.f0; > // 当前温度 > double temperature = sensor.f1; > // 保存当前温度 > lastTemperature = temperature; > // 是否是第一次上报的温度 > if (Objects.equals(lastTemperature, null)) { > return; > } > double diff = Math.abs(temperature - lastTemperature); > if (diff > threshold) { > // 温度变化超过阈值则输出 > out.collect(Tuple3.of(sensorId, temperature, diff)); > } > } > > > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > // 获取最新的温度之后更新保存上一次温度的状态 > if (!Objects.equals(lastTemperature, null)) { > lastTemperatureState.update(lastTemperature); > } > } > > > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > ValueStateDescriptor<Double> stateDescriptor = new > ValueStateDescriptor<>("lastTemperature", Double.class); > lastTemperatureState = > context.getKeyedStateStore().getState(stateDescriptor); > if (context.isRestored()) { > lastTemperature = lastTemperatureState.value(); > } > } > } > ``` > > -- Best, Hangxiang.