Re: Streaming statefull operator with hashmap

2015-11-18 Thread Stephan Ewen
For initializing the Map manually, I meant making "null" the default value
and writing the code like

HashMap map = state.value()
if (map == null) {
  map = new HashMap<>();
}

rather than expecting the state to always clone you a new empty map

On Thu, Nov 12, 2015 at 11:29 AM, Aljoscha Krettek 
wrote:

> Hi,
> you can do it using the register* methods on StreamExecutionEnvironment.
> So, for example:
>
> // set up the execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.registerType(InputType.class);
> env.registerType(MicroModel.class);
>
> If you want to have custom Kryo Serializers for those types you can also
> do:
>
> env.registerTypeWithKryoSerializer(InputType.class,
> MyInputTypeSerializer.class);
>
> I hope this gets you on the right track. :D
>
> Cheers,
> Aljoscha
>
> > On 11 Nov 2015, at 21:14, Martin Neumann  wrote:
> >
> > Thanks for the help.
> >
> > TypeExtractor.getForObject(modelMapInit) did the job. Its possible that
> its
> > an IDE problem that .getClass() did not work. Intellij is a bit fiddly
> with
> > those things.
> >
> > 1) Making null the default value and initializing manually is probably
> more
> >> efficient, because otherwise the empty map would have to be cloned each
> >> time the default value is returned, which adds avoidable overhead.
> >
> >
> > What do you mean by initialize manually? Can I do that direct in the open
> > function or are we talking about checking for null in the FlatMap and
> > initializing there? In general the program is supposed to constantly run
> > once deployed, so I can get away with a little slower setup.
> >
> > 2) The HashMap type will most likely go through Kryo, so for efficiency,
> >> make sure you register the types "InputType" and "MicroModel" on the
> >> execution environment.
> >>Here you need to do that manually, because they are type erased and
> >> Flink cannot auto-register them.
> >
> >
> > Can you point me to an example on how to do this?
> >
> > cheers Martin
> >
> >
> > On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen  wrote:
> >
> >> It should suffice to do something like
> >>
> >> "getRuntimeContext().getKeyValueState("microModelMap", new
> >> HashMap().getClass(), null);"
> >>
> >> Two more comments:
> >>
> >> 1) Making null the default value and initializing manually is probably
> more
> >> efficient, because otherwise the empty map would have to be cloned each
> >> time the default value is returned, which adds avoidable overhead.
> >>
> >> 2) The HashMap type will most likely go through Kryo, so for efficiency,
> >> make sure you register the types "InputType" and "MicroModel" on the
> >> execution environment.
> >>Here you need to do that manually, because they are type erased and
> >> Flink cannot auto-register them.
> >>
> >> Greetings,
> >> Stephan
> >>
> >>
> >>
> >> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra 
> wrote:
> >>
> >>> Hey,
> >>>
> >>> Yes what you wrote should work. You can alternatively use
> >>> TypeExtractor.getForObject(modelMapInit) to extract the tye
> information.
> >>>
> >>> I also like to implement my custom type info for Hashmaps and the other
> >>> types and use that.
> >>>
> >>> Cheers,
> >>> Gyula
> >>>
> >>> Martin Neumann  ezt írta (időpont: 2015. nov. 11.,
> >> Sze,
> >>> 16:30):
> >>>
>  Hej,
> 
>  What is the correct way of initializing a state-full operator that is
> >>> using
>  a hashmap? modelMapInit.getClass() does not work neither does
>  HashMap.class. Do I have to implement my own TypeInformation class or
> >> is
>  there a simpler way?
> 
>  cheers Martin
> 
>  private OperatorState> microModelMap;
> 
>  @Override
>  public void open(Configuration parameters) throws Exception {
> HashMap modelMapInit = new HashMap<>();
> this.microModelMap =
>  getRuntimeContext().getKeyValueState("microModelMap",
>  modelMapInit.getClass() , modelMapInit);
>  }
> 
> >>>
> >>
>
>


Re: Streaming statefull operator with hashmap

2015-11-12 Thread Aljoscha Krettek
Hi,
you can do it using the register* methods on StreamExecutionEnvironment. So, 
for example:

// set up the execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.registerType(InputType.class);
env.registerType(MicroModel.class);

If you want to have custom Kryo Serializers for those types you can also do:

env.registerTypeWithKryoSerializer(InputType.class, 
MyInputTypeSerializer.class);

I hope this gets you on the right track. :D

Cheers,
Aljoscha

> On 11 Nov 2015, at 21:14, Martin Neumann  wrote:
> 
> Thanks for the help.
> 
> TypeExtractor.getForObject(modelMapInit) did the job. Its possible that its
> an IDE problem that .getClass() did not work. Intellij is a bit fiddly with
> those things.
> 
> 1) Making null the default value and initializing manually is probably more
>> efficient, because otherwise the empty map would have to be cloned each
>> time the default value is returned, which adds avoidable overhead.
> 
> 
> What do you mean by initialize manually? Can I do that direct in the open
> function or are we talking about checking for null in the FlatMap and
> initializing there? In general the program is supposed to constantly run
> once deployed, so I can get away with a little slower setup.
> 
> 2) The HashMap type will most likely go through Kryo, so for efficiency,
>> make sure you register the types "InputType" and "MicroModel" on the
>> execution environment.
>>Here you need to do that manually, because they are type erased and
>> Flink cannot auto-register them.
> 
> 
> Can you point me to an example on how to do this?
> 
> cheers Martin
> 
> 
> On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen  wrote:
> 
>> It should suffice to do something like
>> 
>> "getRuntimeContext().getKeyValueState("microModelMap", new
>> HashMap().getClass(), null);"
>> 
>> Two more comments:
>> 
>> 1) Making null the default value and initializing manually is probably more
>> efficient, because otherwise the empty map would have to be cloned each
>> time the default value is returned, which adds avoidable overhead.
>> 
>> 2) The HashMap type will most likely go through Kryo, so for efficiency,
>> make sure you register the types "InputType" and "MicroModel" on the
>> execution environment.
>>Here you need to do that manually, because they are type erased and
>> Flink cannot auto-register them.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra  wrote:
>> 
>>> Hey,
>>> 
>>> Yes what you wrote should work. You can alternatively use
>>> TypeExtractor.getForObject(modelMapInit) to extract the tye information.
>>> 
>>> I also like to implement my custom type info for Hashmaps and the other
>>> types and use that.
>>> 
>>> Cheers,
>>> Gyula
>>> 
>>> Martin Neumann  ezt írta (időpont: 2015. nov. 11.,
>> Sze,
>>> 16:30):
>>> 
 Hej,
 
 What is the correct way of initializing a state-full operator that is
>>> using
 a hashmap? modelMapInit.getClass() does not work neither does
 HashMap.class. Do I have to implement my own TypeInformation class or
>> is
 there a simpler way?
 
 cheers Martin
 
 private OperatorState> microModelMap;
 
 @Override
 public void open(Configuration parameters) throws Exception {
HashMap modelMapInit = new HashMap<>();
this.microModelMap =
 getRuntimeContext().getKeyValueState("microModelMap",
 modelMapInit.getClass() , modelMapInit);
 }
 
>>> 
>> 



Re: Streaming statefull operator with hashmap

2015-11-11 Thread Gyula Fóra
Hey,

Yes what you wrote should work. You can alternatively use
TypeExtractor.getForObject(modelMapInit) to extract the tye information.

I also like to implement my custom type info for Hashmaps and the other
types and use that.

Cheers,
Gyula

Martin Neumann  ezt írta (időpont: 2015. nov. 11., Sze,
16:30):

> Hej,
>
> What is the correct way of initializing a state-full operator that is using
> a hashmap? modelMapInit.getClass() does not work neither does
> HashMap.class. Do I have to implement my own TypeInformation class or is
> there a simpler way?
>
> cheers Martin
>
> private OperatorState> microModelMap;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> HashMap modelMapInit = new HashMap<>();
> this.microModelMap =
> getRuntimeContext().getKeyValueState("microModelMap",
> modelMapInit.getClass() , modelMapInit);
> }
>


Re: Streaming statefull operator with hashmap

2015-11-11 Thread Stephan Ewen
It should suffice to do something like

"getRuntimeContext().getKeyValueState("microModelMap", new
HashMap().getClass(), null);"

Two more comments:

1) Making null the default value and initializing manually is probably more
efficient, because otherwise the empty map would have to be cloned each
time the default value is returned, which adds avoidable overhead.

2) The HashMap type will most likely go through Kryo, so for efficiency,
make sure you register the types "InputType" and "MicroModel" on the
execution environment.
Here you need to do that manually, because they are type erased and
Flink cannot auto-register them.

Greetings,
Stephan



On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra  wrote:

> Hey,
>
> Yes what you wrote should work. You can alternatively use
> TypeExtractor.getForObject(modelMapInit) to extract the tye information.
>
> I also like to implement my custom type info for Hashmaps and the other
> types and use that.
>
> Cheers,
> Gyula
>
> Martin Neumann  ezt írta (időpont: 2015. nov. 11., Sze,
> 16:30):
>
> > Hej,
> >
> > What is the correct way of initializing a state-full operator that is
> using
> > a hashmap? modelMapInit.getClass() does not work neither does
> > HashMap.class. Do I have to implement my own TypeInformation class or is
> > there a simpler way?
> >
> > cheers Martin
> >
> > private OperatorState> microModelMap;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > HashMap modelMapInit = new HashMap<>();
> > this.microModelMap =
> > getRuntimeContext().getKeyValueState("microModelMap",
> > modelMapInit.getClass() , modelMapInit);
> > }
> >
>


Streaming statefull operator with hashmap

2015-11-11 Thread Martin Neumann
Hej,

What is the correct way of initializing a state-full operator that is using
a hashmap? modelMapInit.getClass() does not work neither does
HashMap.class. Do I have to implement my own TypeInformation class or is
there a simpler way?

cheers Martin

private OperatorState> microModelMap;

@Override
public void open(Configuration parameters) throws Exception {
HashMap modelMapInit = new HashMap<>();
this.microModelMap =
getRuntimeContext().getKeyValueState("microModelMap",
modelMapInit.getClass() , modelMapInit);
}


Re: Streaming statefull operator with hashmap

2015-11-11 Thread Martin Neumann
Thanks for the help.

TypeExtractor.getForObject(modelMapInit) did the job. Its possible that its
an IDE problem that .getClass() did not work. Intellij is a bit fiddly with
those things.

1) Making null the default value and initializing manually is probably more
> efficient, because otherwise the empty map would have to be cloned each
> time the default value is returned, which adds avoidable overhead.


What do you mean by initialize manually? Can I do that direct in the open
function or are we talking about checking for null in the FlatMap and
initializing there? In general the program is supposed to constantly run
once deployed, so I can get away with a little slower setup.

2) The HashMap type will most likely go through Kryo, so for efficiency,
> make sure you register the types "InputType" and "MicroModel" on the
> execution environment.
> Here you need to do that manually, because they are type erased and
> Flink cannot auto-register them.


Can you point me to an example on how to do this?

cheers Martin


On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen  wrote:

> It should suffice to do something like
>
> "getRuntimeContext().getKeyValueState("microModelMap", new
> HashMap().getClass(), null);"
>
> Two more comments:
>
> 1) Making null the default value and initializing manually is probably more
> efficient, because otherwise the empty map would have to be cloned each
> time the default value is returned, which adds avoidable overhead.
>
> 2) The HashMap type will most likely go through Kryo, so for efficiency,
> make sure you register the types "InputType" and "MicroModel" on the
> execution environment.
> Here you need to do that manually, because they are type erased and
> Flink cannot auto-register them.
>
> Greetings,
> Stephan
>
>
>
> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra  wrote:
>
> > Hey,
> >
> > Yes what you wrote should work. You can alternatively use
> > TypeExtractor.getForObject(modelMapInit) to extract the tye information.
> >
> > I also like to implement my custom type info for Hashmaps and the other
> > types and use that.
> >
> > Cheers,
> > Gyula
> >
> > Martin Neumann  ezt írta (időpont: 2015. nov. 11.,
> Sze,
> > 16:30):
> >
> > > Hej,
> > >
> > > What is the correct way of initializing a state-full operator that is
> > using
> > > a hashmap? modelMapInit.getClass() does not work neither does
> > > HashMap.class. Do I have to implement my own TypeInformation class or
> is
> > > there a simpler way?
> > >
> > > cheers Martin
> > >
> > > private OperatorState> microModelMap;
> > >
> > > @Override
> > > public void open(Configuration parameters) throws Exception {
> > > HashMap modelMapInit = new HashMap<>();
> > > this.microModelMap =
> > > getRuntimeContext().getKeyValueState("microModelMap",
> > > modelMapInit.getClass() , modelMapInit);
> > > }
> > >
> >
>