Hello, I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code: @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); }
When I run, I get the following error: Caused by: java.lang.RuntimeException: Error while getting state at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120) at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state. at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118) ... 8 more Where do I define the key & value serializer for state? Thanks, Buvana