你应该没有正确理解 state 的使用 我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。
基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ; 另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个 key)。 回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。 Zhiwen Sun On Fri, Aug 26, 2022 at 1:55 PM 曲洋 <quyanghao...@126.com> wrote: > > 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗. > 嗯嗯,我现在改成先进行判断 > > > > > > > > > > > > > > > > > > 在 2022-08-26 11:22:43,"Hangxiang Yu" <master...@gmail.com> 写道: > >open确实是初始化的时候就会调用的; > > >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的; > >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法; > > > >On Fri, Aug 26, 2022 at 10:25 AM 曲洋 <quyanghao...@126.com> wrote: > > > >> 各位好, > >> > >> > 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下: > >> 我重写了RichMapFunction,yearTotal > >> > 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来 > >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是 > >> public static class AccumulateAmounts extends RichMapFunction<v2bean, > >> BlueAccumulaterInitState> { > >> private transient ValueState<BlueAccumulaterInitState> > >> statAccumulator; > >> > >> > >> @Override > >> public BlueAccumulaterInitState map(v2bean currentAccumulator) > >> throws Exception { > >> > >> > >> BlueAccumulaterInitState stat = (statAccumulator.value() != > >> null) ? statAccumulator.value() : new BlueAccumulaterInitState(); > >> Long yearIncrement = year.equals(stat.getYear()) ? > >> stat.getYearMetric() + 1L : 1L; > >> stat.setYearMetric(yearIncrement); > >> > >> > >> statAccumulator.update(stat); > >> return stat; > >> } > >> > >> > >> @Override > >> public void open(Configuration config) { > >> ValueStateDescriptor<BlueAccumulaterInitState> descriptor = > >> new ValueStateDescriptor<>( > >> "total", > >> TypeInformation.of(new > >> TypeHint<BlueAccumulaterInitState>() { > >> })); > >> statAccumulator = getRuntimeContext().getState(descriptor); > >> ExecutionConfig.GlobalJobParameters globalParams = > >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > >> Configuration globConf = (Configuration) globalParams; > >> long yearTotal = > >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue()); > >> statAccumulator.value().setYearMetric(yearTotal); > >> > >> > >> > >> } > >> } > > > > > > > >-- > >Best, > >Hangxiang. >