你应该没有正确理解 state 的使用

我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。

基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ;
另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个
key)。

回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。






Zhiwen Sun



On Fri, Aug 26, 2022 at 1:55 PM 曲洋 <quyanghao...@126.com> wrote:

>
> 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗.
> 嗯嗯,我现在改成先进行判断
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-08-26 11:22:43,"Hangxiang Yu" <master...@gmail.com> 写道:
> >open确实是初始化的时候就会调用的;
>
> >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的;
> >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法;
> >
> >On Fri, Aug 26, 2022 at 10:25 AM 曲洋 <quyanghao...@126.com> wrote:
> >
> >> 各位好,
> >>
> >>
> 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
> >> 我重写了RichMapFunction,yearTotal
> >>
> 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
> >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
> >> public static class AccumulateAmounts extends RichMapFunction<v2bean,
> >> BlueAccumulaterInitState> {
> >>         private transient ValueState<BlueAccumulaterInitState>
> >> statAccumulator;
> >>
> >>
> >>         @Override
> >>         public BlueAccumulaterInitState map(v2bean currentAccumulator)
> >> throws Exception {
> >>
> >>
> >>             BlueAccumulaterInitState stat = (statAccumulator.value() !=
> >> null) ? statAccumulator.value() : new BlueAccumulaterInitState();
> >>             Long yearIncrement = year.equals(stat.getYear()) ?
> >> stat.getYearMetric() + 1L : 1L;
> >>             stat.setYearMetric(yearIncrement);
> >>
> >>
> >>             statAccumulator.update(stat);
> >>             return stat;
> >>         }
> >>
> >>
> >>         @Override
> >>         public void open(Configuration config) {
> >>             ValueStateDescriptor<BlueAccumulaterInitState> descriptor =
> >>                     new ValueStateDescriptor<>(
> >>                             "total",
> >>                             TypeInformation.of(new
> >> TypeHint<BlueAccumulaterInitState>() {
> >>                             }));
> >>             statAccumulator = getRuntimeContext().getState(descriptor);
> >>             ExecutionConfig.GlobalJobParameters globalParams =
> >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> >>             Configuration globConf = (Configuration) globalParams;
> >>             long yearTotal =
> >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
> >> statAccumulator.value().setYearMetric(yearTotal);
> >>
> >>
> >>
> >>         }
> >>     }
> >
> >
> >
> >--
> >Best,
> >Hangxiang.
>

回复