Re: Streaming statefull operator with hashmap
For initializing the Map manually, I meant making "null" the default value and writing the code like HashMapmap = state.value() if (map == null) { map = new HashMap<>(); } rather than expecting the state to always clone you a new empty map On Thu, Nov 12, 2015 at 11:29 AM, Aljoscha Krettek wrote: > Hi, > you can do it using the register* methods on StreamExecutionEnvironment. > So, for example: > > // set up the execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.registerType(InputType.class); > env.registerType(MicroModel.class); > > If you want to have custom Kryo Serializers for those types you can also > do: > > env.registerTypeWithKryoSerializer(InputType.class, > MyInputTypeSerializer.class); > > I hope this gets you on the right track. :D > > Cheers, > Aljoscha > > > On 11 Nov 2015, at 21:14, Martin Neumann wrote: > > > > Thanks for the help. > > > > TypeExtractor.getForObject(modelMapInit) did the job. Its possible that > its > > an IDE problem that .getClass() did not work. Intellij is a bit fiddly > with > > those things. > > > > 1) Making null the default value and initializing manually is probably > more > >> efficient, because otherwise the empty map would have to be cloned each > >> time the default value is returned, which adds avoidable overhead. > > > > > > What do you mean by initialize manually? Can I do that direct in the open > > function or are we talking about checking for null in the FlatMap and > > initializing there? In general the program is supposed to constantly run > > once deployed, so I can get away with a little slower setup. > > > > 2) The HashMap type will most likely go through Kryo, so for efficiency, > >> make sure you register the types "InputType" and "MicroModel" on the > >> execution environment. > >>Here you need to do that manually, because they are type erased and > >> Flink cannot auto-register them. > > > > > > Can you point me to an example on how to do this? > > > > cheers Martin > > > > > > On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen wrote: > > > >> It should suffice to do something like > >> > >> "getRuntimeContext().getKeyValueState("microModelMap", new > >> HashMap ().getClass(), null);" > >> > >> Two more comments: > >> > >> 1) Making null the default value and initializing manually is probably > more > >> efficient, because otherwise the empty map would have to be cloned each > >> time the default value is returned, which adds avoidable overhead. > >> > >> 2) The HashMap type will most likely go through Kryo, so for efficiency, > >> make sure you register the types "InputType" and "MicroModel" on the > >> execution environment. > >>Here you need to do that manually, because they are type erased and > >> Flink cannot auto-register them. > >> > >> Greetings, > >> Stephan > >> > >> > >> > >> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra > wrote: > >> > >>> Hey, > >>> > >>> Yes what you wrote should work. You can alternatively use > >>> TypeExtractor.getForObject(modelMapInit) to extract the tye > information. > >>> > >>> I also like to implement my custom type info for Hashmaps and the other > >>> types and use that. > >>> > >>> Cheers, > >>> Gyula > >>> > >>> Martin Neumann ezt írta (időpont: 2015. nov. 11., > >> Sze, > >>> 16:30): > >>> > Hej, > > What is the correct way of initializing a state-full operator that is > >>> using > a hashmap? modelMapInit.getClass() does not work neither does > HashMap.class. Do I have to implement my own TypeInformation class or > >> is > there a simpler way? > > cheers Martin > > private OperatorState > microModelMap; > > @Override > public void open(Configuration parameters) throws Exception { > HashMap modelMapInit = new HashMap<>(); > this.microModelMap = > getRuntimeContext().getKeyValueState("microModelMap", > modelMapInit.getClass() , modelMapInit); > } > > >>> > >> > >
Re: Streaming statefull operator with hashmap
Hi, you can do it using the register* methods on StreamExecutionEnvironment. So, for example: // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.registerType(InputType.class); env.registerType(MicroModel.class); If you want to have custom Kryo Serializers for those types you can also do: env.registerTypeWithKryoSerializer(InputType.class, MyInputTypeSerializer.class); I hope this gets you on the right track. :D Cheers, Aljoscha > On 11 Nov 2015, at 21:14, Martin Neumannwrote: > > Thanks for the help. > > TypeExtractor.getForObject(modelMapInit) did the job. Its possible that its > an IDE problem that .getClass() did not work. Intellij is a bit fiddly with > those things. > > 1) Making null the default value and initializing manually is probably more >> efficient, because otherwise the empty map would have to be cloned each >> time the default value is returned, which adds avoidable overhead. > > > What do you mean by initialize manually? Can I do that direct in the open > function or are we talking about checking for null in the FlatMap and > initializing there? In general the program is supposed to constantly run > once deployed, so I can get away with a little slower setup. > > 2) The HashMap type will most likely go through Kryo, so for efficiency, >> make sure you register the types "InputType" and "MicroModel" on the >> execution environment. >>Here you need to do that manually, because they are type erased and >> Flink cannot auto-register them. > > > Can you point me to an example on how to do this? > > cheers Martin > > > On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen wrote: > >> It should suffice to do something like >> >> "getRuntimeContext().getKeyValueState("microModelMap", new >> HashMap ().getClass(), null);" >> >> Two more comments: >> >> 1) Making null the default value and initializing manually is probably more >> efficient, because otherwise the empty map would have to be cloned each >> time the default value is returned, which adds avoidable overhead. >> >> 2) The HashMap type will most likely go through Kryo, so for efficiency, >> make sure you register the types "InputType" and "MicroModel" on the >> execution environment. >>Here you need to do that manually, because they are type erased and >> Flink cannot auto-register them. >> >> Greetings, >> Stephan >> >> >> >> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra wrote: >> >>> Hey, >>> >>> Yes what you wrote should work. You can alternatively use >>> TypeExtractor.getForObject(modelMapInit) to extract the tye information. >>> >>> I also like to implement my custom type info for Hashmaps and the other >>> types and use that. >>> >>> Cheers, >>> Gyula >>> >>> Martin Neumann ezt írta (időpont: 2015. nov. 11., >> Sze, >>> 16:30): >>> Hej, What is the correct way of initializing a state-full operator that is >>> using a hashmap? modelMapInit.getClass() does not work neither does HashMap.class. Do I have to implement my own TypeInformation class or >> is there a simpler way? cheers Martin private OperatorState > microModelMap; @Override public void open(Configuration parameters) throws Exception { HashMap modelMapInit = new HashMap<>(); this.microModelMap = getRuntimeContext().getKeyValueState("microModelMap", modelMapInit.getClass() , modelMapInit); } >>> >>
Re: Streaming statefull operator with hashmap
Hey, Yes what you wrote should work. You can alternatively use TypeExtractor.getForObject(modelMapInit) to extract the tye information. I also like to implement my custom type info for Hashmaps and the other types and use that. Cheers, Gyula Martin Neumannezt írta (időpont: 2015. nov. 11., Sze, 16:30): > Hej, > > What is the correct way of initializing a state-full operator that is using > a hashmap? modelMapInit.getClass() does not work neither does > HashMap.class. Do I have to implement my own TypeInformation class or is > there a simpler way? > > cheers Martin > > private OperatorState > microModelMap; > > @Override > public void open(Configuration parameters) throws Exception { > HashMap modelMapInit = new HashMap<>(); > this.microModelMap = > getRuntimeContext().getKeyValueState("microModelMap", > modelMapInit.getClass() , modelMapInit); > } >
Re: Streaming statefull operator with hashmap
It should suffice to do something like "getRuntimeContext().getKeyValueState("microModelMap", new HashMap().getClass(), null);" Two more comments: 1) Making null the default value and initializing manually is probably more efficient, because otherwise the empty map would have to be cloned each time the default value is returned, which adds avoidable overhead. 2) The HashMap type will most likely go through Kryo, so for efficiency, make sure you register the types "InputType" and "MicroModel" on the execution environment. Here you need to do that manually, because they are type erased and Flink cannot auto-register them. Greetings, Stephan On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra wrote: > Hey, > > Yes what you wrote should work. You can alternatively use > TypeExtractor.getForObject(modelMapInit) to extract the tye information. > > I also like to implement my custom type info for Hashmaps and the other > types and use that. > > Cheers, > Gyula > > Martin Neumann ezt írta (időpont: 2015. nov. 11., Sze, > 16:30): > > > Hej, > > > > What is the correct way of initializing a state-full operator that is > using > > a hashmap? modelMapInit.getClass() does not work neither does > > HashMap.class. Do I have to implement my own TypeInformation class or is > > there a simpler way? > > > > cheers Martin > > > > private OperatorState > microModelMap; > > > > @Override > > public void open(Configuration parameters) throws Exception { > > HashMap modelMapInit = new HashMap<>(); > > this.microModelMap = > > getRuntimeContext().getKeyValueState("microModelMap", > > modelMapInit.getClass() , modelMapInit); > > } > > >
Streaming statefull operator with hashmap
Hej, What is the correct way of initializing a state-full operator that is using a hashmap? modelMapInit.getClass() does not work neither does HashMap.class. Do I have to implement my own TypeInformation class or is there a simpler way? cheers Martin private OperatorState> microModelMap; @Override public void open(Configuration parameters) throws Exception { HashMap modelMapInit = new HashMap<>(); this.microModelMap = getRuntimeContext().getKeyValueState("microModelMap", modelMapInit.getClass() , modelMapInit); }
Re: Streaming statefull operator with hashmap
Thanks for the help. TypeExtractor.getForObject(modelMapInit) did the job. Its possible that its an IDE problem that .getClass() did not work. Intellij is a bit fiddly with those things. 1) Making null the default value and initializing manually is probably more > efficient, because otherwise the empty map would have to be cloned each > time the default value is returned, which adds avoidable overhead. What do you mean by initialize manually? Can I do that direct in the open function or are we talking about checking for null in the FlatMap and initializing there? In general the program is supposed to constantly run once deployed, so I can get away with a little slower setup. 2) The HashMap type will most likely go through Kryo, so for efficiency, > make sure you register the types "InputType" and "MicroModel" on the > execution environment. > Here you need to do that manually, because they are type erased and > Flink cannot auto-register them. Can you point me to an example on how to do this? cheers Martin On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewenwrote: > It should suffice to do something like > > "getRuntimeContext().getKeyValueState("microModelMap", new > HashMap ().getClass(), null);" > > Two more comments: > > 1) Making null the default value and initializing manually is probably more > efficient, because otherwise the empty map would have to be cloned each > time the default value is returned, which adds avoidable overhead. > > 2) The HashMap type will most likely go through Kryo, so for efficiency, > make sure you register the types "InputType" and "MicroModel" on the > execution environment. > Here you need to do that manually, because they are type erased and > Flink cannot auto-register them. > > Greetings, > Stephan > > > > On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra wrote: > > > Hey, > > > > Yes what you wrote should work. You can alternatively use > > TypeExtractor.getForObject(modelMapInit) to extract the tye information. > > > > I also like to implement my custom type info for Hashmaps and the other > > types and use that. > > > > Cheers, > > Gyula > > > > Martin Neumann ezt írta (időpont: 2015. nov. 11., > Sze, > > 16:30): > > > > > Hej, > > > > > > What is the correct way of initializing a state-full operator that is > > using > > > a hashmap? modelMapInit.getClass() does not work neither does > > > HashMap.class. Do I have to implement my own TypeInformation class or > is > > > there a simpler way? > > > > > > cheers Martin > > > > > > private OperatorState > microModelMap; > > > > > > @Override > > > public void open(Configuration parameters) throws Exception { > > > HashMap modelMapInit = new HashMap<>(); > > > this.microModelMap = > > > getRuntimeContext().getKeyValueState("microModelMap", > > > modelMapInit.getClass() , modelMapInit); > > > } > > > > > >