Hi,

The state that is being loaded can very well be partitioned by keys. Assuming 
this scenario and that you would now that the keys go from 0 to N, is there 
some possibility to load and partitioned the initial data in the open function?


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-----Original Message-----
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Tuesday, December 08, 2015 4:20 PM
To: user@flink.apache.org
Subject: Re: Question about DataStream serialization

Ah, I see what’s the problem. Operator state is scoped to the key of the 
incoming element. In the open() method, no element has been received yet, so 
the key of the incoming element is basically NULL. So the open() method 
initializes state for key NULL. In flatMap() we actually have a key of incoming 
elements so we access state for a specific key, which has default value “0” 
(from the getKeyValueState() call).

OperatorState is only useful if the state needs to be partitioned by key, but 
here it seems that the state is valid for all elements?
> On 08 Dec 2015, at 15:30, Radu Tudoran <radu.tudo...@huawei.com> wrote:
> 
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>                               .getExecutionEnvironment();
> 
>               DataStream<String> stream = env
>                               .socketTextStream("localhost", 16333, '\n')
>                               .map(new MapFunction<String, Tuple1<String>>() {
>                                       @Override
>                                       public Tuple1<String> map(String arg0) 
> throws Exception {
>                                               return new Tuple1<String>(arg0);
>                                       }
>                               }).keyBy(0)
>                               .flatMap(new 
> RichFlatMapFunction<Tuple1<String>, String>() {
> 
>                                       private OperatorState<Integer> dataset;
> 
>                                       @Override
>                                       public void flatMap(Tuple1<String> arg0,
>                                                       Collector<String> arg1) 
> throws Exception {
> 
>                                               if (dataset.value() > 0)
>                                                       arg1.collect("Test OK " 
> + arg0);
> 
>                                               
>                                               
>                                       }
> 
>                                       @Override
>                                       public void open(Configuration 
> parameters) throws Exception {
> 
>                                               dataset = 
> getRuntimeContext().getKeyValueState(
>                                                               "loadeddata", 
> Integer.class, 0);
> 
>                                               
>                                                /*
>                                                 * Simulate loading data
>                                                 * Looks like if this part is  
> commented out and the dataset is 
>                                                 * initialize with 1 for 
> example, than the non-zero value is available 
>                                                 * in the flatMap function  
>                                                 */
>                                                 
>                                                 for(int i=0;i<10;i++) {
>                                                         
> dataset.update(dataset.value()+1);
>                                                 }
>                                                 
>                                                 //System.out.println("dataset 
> value "+dataset.value());
>                                                 
>                                       }
>                               });
> 
>               stream.print();
> 
>               env.execute("test open function");

Reply via email to