Hi, The state that is being loaded can very well be partitioned by keys. Assuming this scenario and that you would now that the keys go from 0 to N, is there some possibility to load and partitioned the initial data in the open function?
Dr. Radu Tudoran Research Engineer IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -----Original Message----- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Tuesday, December 08, 2015 4:20 PM To: user@flink.apache.org Subject: Re: Question about DataStream serialization Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element. In the open() method, no element has been received yet, so the key of the incoming element is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually have a key of incoming elements so we access state for a specific key, which has default value “0” (from the getKeyValueState() call). OperatorState is only useful if the state needs to be partitioned by key, but here it seems that the state is valid for all elements? > On 08 Dec 2015, at 15:30, Radu Tudoran <radu.tudo...@huawei.com> wrote: > > final StreamExecutionEnvironment env = StreamExecutionEnvironment > .getExecutionEnvironment(); > > DataStream<String> stream = env > .socketTextStream("localhost", 16333, '\n') > .map(new MapFunction<String, Tuple1<String>>() { > @Override > public Tuple1<String> map(String arg0) > throws Exception { > return new Tuple1<String>(arg0); > } > }).keyBy(0) > .flatMap(new > RichFlatMapFunction<Tuple1<String>, String>() { > > private OperatorState<Integer> dataset; > > @Override > public void flatMap(Tuple1<String> arg0, > Collector<String> arg1) > throws Exception { > > if (dataset.value() > 0) > arg1.collect("Test OK " > + arg0); > > > > } > > @Override > public void open(Configuration > parameters) throws Exception { > > dataset = > getRuntimeContext().getKeyValueState( > "loadeddata", > Integer.class, 0); > > > /* > * Simulate loading data > * Looks like if this part is > commented out and the dataset is > * initialize with 1 for > example, than the non-zero value is available > * in the flatMap function > */ > > for(int i=0;i<10;i++) { > > dataset.update(dataset.value()+1); > } > > //System.out.println("dataset > value "+dataset.value()); > > } > }); > > stream.print(); > > env.execute("test open function");