Ah, I see what’s the problem. Operator state is scoped to the key of the 
incoming element. In the open() method, no element has been received yet, so 
the key of the incoming element is basically NULL. So the open() method 
initializes state for key NULL. In flatMap() we actually have a key of incoming 
elements so we access state for a specific key, which has default value “0” 
(from the getKeyValueState() call).

OperatorState is only useful if the state needs to be partitioned by key, but 
here it seems that the state is valid for all elements?
> On 08 Dec 2015, at 15:30, Radu Tudoran <radu.tudo...@huawei.com> wrote:
> 
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>                               .getExecutionEnvironment();
> 
>               DataStream<String> stream = env
>                               .socketTextStream("localhost", 16333, '\n')
>                               .map(new MapFunction<String, Tuple1<String>>() {
>                                       @Override
>                                       public Tuple1<String> map(String arg0) 
> throws Exception {
>                                               return new Tuple1<String>(arg0);
>                                       }
>                               }).keyBy(0)
>                               .flatMap(new 
> RichFlatMapFunction<Tuple1<String>, String>() {
> 
>                                       private OperatorState<Integer> dataset;
> 
>                                       @Override
>                                       public void flatMap(Tuple1<String> arg0,
>                                                       Collector<String> arg1) 
> throws Exception {
> 
>                                               if (dataset.value() > 0)
>                                                       arg1.collect("Test OK " 
> + arg0);
> 
>                                               
>                                               
>                                       }
> 
>                                       @Override
>                                       public void open(Configuration 
> parameters) throws Exception {
> 
>                                               dataset = 
> getRuntimeContext().getKeyValueState(
>                                                               "loadeddata", 
> Integer.class, 0);
> 
>                                               
>                                                /*
>                                                 * Simulate loading data
>                                                 * Looks like if this part is  
> commented out and the dataset is 
>                                                 * initialize with 1 for 
> example, than the non-zero value is available 
>                                                 * in the flatMap function  
>                                                 */
>                                                 
>                                                 for(int i=0;i<10;i++) {
>                                                         
> dataset.update(dataset.value()+1);
>                                                 }
>                                                 
>                                                 //System.out.println("dataset 
> value "+dataset.value());
>                                                 
>                                       }
>                               });
> 
>               stream.print();
> 
>               env.execute("test open function");

Reply via email to