各位好,
 
我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
我重写了RichMapFunction,yearTotal 
这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
public static class AccumulateAmounts extends RichMapFunction<v2bean, 
BlueAccumulaterInitState> {
        private transient ValueState<BlueAccumulaterInitState> statAccumulator;


        @Override
        public BlueAccumulaterInitState map(v2bean currentAccumulator) throws 
Exception {


            BlueAccumulaterInitState stat = (statAccumulator.value() != null) ? 
statAccumulator.value() : new BlueAccumulaterInitState();
            Long yearIncrement = year.equals(stat.getYear()) ? 
stat.getYearMetric() + 1L : 1L;
            stat.setYearMetric(yearIncrement);


            statAccumulator.update(stat);
            return stat;
        }


        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<BlueAccumulaterInitState> descriptor =
                    new ValueStateDescriptor<>(
                            "total",
                            TypeInformation.of(new 
TypeHint<BlueAccumulaterInitState>() {
                            }));
            statAccumulator = getRuntimeContext().getState(descriptor);
            ExecutionConfig.GlobalJobParameters globalParams = 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
            Configuration globConf = (Configuration) globalParams;
            long yearTotal = 
globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
statAccumulator.value().setYearMetric(yearTotal);
      


        }
    }

回复