We should adjust the error message to contain the keyed stream thingy.

On 23.06.2016 10:11, Till Rohrmann wrote:
Hi Jacob,

the `ListState` abstraction is a state which we call partitioned/key-value state. As such, it is only possible to use it with a keyed stream. This means that you have to call `keyBy` after the `connect` API call.

Cheers,
Till

On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <m...@jacobbay.dk <mailto:m...@jacobbay.dk>> wrote:

    Hi,

    I am trying to use a ListState in a RichCoFlatMapFunction but when
    calling: getRuntimeContext().getListState(descriptor) in the
    open-function i am getting a "State key serializer has not .."
    exception. I am not sure what i can do to avoid this exception -
    Are any of you able to provide some help ?

    Best regards
    Jacob


     private ListState<Tuple2<String, Integer>> deltaPositions;

        @Override
        public void open(org.apache.flink.configuration.Configuration
    parameters) throws Exception {
          // Create state variable
          ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                  new ListStateDescriptor<>(
                          "deltaPositions", // the state name
                          TypeInformation.of(new
    TypeHint<Tuple2<String, Integer>>() {
                          }));

          deltaPositions = getRuntimeContext().getListState(descriptor);
        };



    2016-06-22 20:41:38,813 INFO
     org.apache.flink.runtime.taskmanager.Task       - Stream of Items
    with collection of meadian times (1/1) switched to FAILED with
    exception.
    java.lang.RuntimeException: Error while getting state
    at
    
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
    at
    
crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
    at
    
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at
    
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at
    
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.Exception: State key serializer has not been
    configured in the config. This operation cannot use partitioned state.
    at
    
org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
    at
    
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
    at
    
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
    ... 8 more
    2016-06-22 20:41:38,815 INFO
     org.apache.flink.runtime.taskmanager.Task       - Freeing ta

-- Jacob Bay Larsen

    Phone: +45 6133 1108 <tel:%2B45%206133%201108>
    E-mail: m...@jacobbay.dk <mailto:m...@jacobbay.dk>



Reply via email to