Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和
snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里;
原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set
Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的;
而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update
value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做;

On Fri, May 5, 2023 at 10:58 PM sjf0115 <si_ji_f...@163.com> wrote:

> CheckpointedFunction 接口的 initializeState 方法提供了访问
> FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问
> OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过
> CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState:
> ```java
> context.getKeyedStateStore().getState(stateDescriptor);
> ```
> 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
> ```java
> public static class TemperatureAlertFlatMapFunction extends
> RichFlatMapFunction<Tuple2<String, Double>, Tuple3<String, Double, Double>>
> implements CheckpointedFunction  {
>     // 温度差报警阈值
>     private double threshold;
>     // 上一次温度
>     private ValueState<Double> lastTemperatureState;
>     private Double lastTemperature;
>     public TemperatureAlertFlatMapFunction(double threshold) {
>         this.threshold = threshold;
>     }
>
>
>     @Override
>     public void flatMap(Tuple2<String, Double> sensor,
> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>         String sensorId = sensor.f0;
>         // 当前温度
>         double temperature = sensor.f1;
>         // 保存当前温度
>         lastTemperature = temperature;
>         // 是否是第一次上报的温度
>         if (Objects.equals(lastTemperature, null)) {
>             return;
>         }
>         double diff = Math.abs(temperature - lastTemperature);
>         if (diff > threshold) {
>             // 温度变化超过阈值则输出
>             out.collect(Tuple3.of(sensorId, temperature, diff));
>         }
>     }
>
>
>     @Override
>     public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
>         // 获取最新的温度之后更新保存上一次温度的状态
>         if (!Objects.equals(lastTemperature, null)) {
>             lastTemperatureState.update(lastTemperature);
>         }
>     }
>
>
>     @Override
>     public void initializeState(FunctionInitializationContext context)
> throws Exception {
>         ValueStateDescriptor<Double> stateDescriptor = new
> ValueStateDescriptor<>("lastTemperature", Double.class);
>         lastTemperatureState =
> context.getKeyedStateStore().getState(stateDescriptor);
>         if (context.isRestored()) {
>             lastTemperature = lastTemperatureState.value();
>         }
>     }
> }
> ```
>
>

-- 
Best,
Hangxiang.

回复