RE: Question about DataStream serialization

2015-12-08 Thread Radu Tudoran
Hi,

Thanks for the answer - it is helpful.
The issue that remains is why is the open function not being executed before 
the flatmap to load the data in the OperatorState. 

I used something like - and I observe that the dataset is not initialized when 
being used in the flatmap function

env.socketTextStream
.map() -> to transform data to a Tuple1
.keyby(0) -> to enable the usage of the operatorState which I saw requires 
keyed structured
.flatmap(RichFlatMapFunction   -> the function
{
private OperatorState dataset;
@Override
public void flatMap(
{
Dataset -> use ...is empty
}
@Override
public void open(
{
dataset -> load 
}
})



Dr. Radu Tudoran
Research Engineer
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Matthias J. Sax [mailto:mj...@apache.org] 
Sent: Tuesday, December 08, 2015 8:42 AM
To: user@flink.apache.org
Subject: Re: Question about DataStream serialization

Hi Radu,

you are right. The open() method is called for each parallel instance of a rich 
function. Thus, if all instanced use the same code, you might read the same 
data multiple times.

The easiest was to distinguish different instanced within open() is to user the 
RuntimeContext. If offers two methods  "int getNumberOfParallelSubtasks()" and 
"int getIndexOfThisSubtask()" that you can use to compute your own partitioning 
within open().

For example (just a sketch):

@Override
public void open(Configuration parameters) throws Exception {
  RuntimeContext context = super.getRuntimeContext();
  int dop = context.getNumberOfParallelSubtasks();
  int idx = context.getIndexOfThisSubtask();

  // open file
  // get size of file in bytes

  // seek to partition #idx:
  long seek = fileSize * idx / dop;

  // read "fileSize/dop" bytes
}

Hope this helps.

-Matthias


On 12/08/2015 04:28 AM, Radu Tudoran wrote:
> Hi,
> 
>  
> 
> Taking the example you mentioned of using RichFlatMapFunction and in 
> the
> open() reading a file.
> 
> Would this open function be executed on each node where the 
> RichFlatMapFunction gets executed? (I have done some tests and I would 
> get the feeling it does – but I wanted to double - check )
> 
> If so, would this mean that the same data will be loaded multiple 
> times on each parallel instance? Is there anyway, this can be 
> prevented and the data to be hashed and partitioned somehow across nodes?
> 
>  
> 
> Would using the operator state help?:
> 
> “
> 
> OperatorState*<*MyList*>*dataset*;*
> 
> ”
> 
> I would be curious in this case how could the open function look like 
> to initialize the data for this operator state:
> 
>  
> 
>  
> 
> I have tried to just read a file and write it into the dataset, but I 
> encountered a strange behavior that would look like the flatmap 
> function gets executed before the open function, which leads to using 
> an empty dataset in the flatmap function while when this finish 
> executing the dataset gets loaded. Is this an error or I am doing something 
> wrong?
> 
>  
> 
>  
> 
>  
> 
> Dr. Radu Tudoran
> 
> Research Engineer
> 
> IT R Division
> 
>  
> 
> cid:image007.jpg@01CD52EB.AD060EE0
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> 
> European Research Center
> 
> Riesstrasse 25, 80992 München
> 
>  
> 
> E-mail: _radu.tudoran@huawei.com_
> 
> Mobile: +49 15209084330
> 
> Telephone: +49 891588344173
> 
>  
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
>  Registered Office: Düsseldorf, Register Court 
> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, 
> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, 
> HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> 
> This e-mail and its attachments contain confidential information from 
> HUAWEI, which is intended only for the person or entity whose address 
> is listed above. Any use of the information contained herein in any 
> way (including, but not limited to, total 

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Till Rohrmann
Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the
cluster. Therefore, the closures must only contain Serializable objects.
The serializer registration only applies to the data which is processed by
the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo
object in your closure.

Cheers,
Till
​

On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk  wrote:

> Hello,
>
> I've implemented a (streaming) flow using the Java API and Java8 Lambdas
> for various map functions. When I try to run the flow, job submission fails
> because of an unserializable type. This is not a type of data used within
> the flow, but rather a small collection of objects captured in the closure
> context over one of my Lambdas. I've implemented and registered a Kryo
> Serializer for this type with this environment, however, it's apparently
> not used when serializing the lambdas. Seems like the same serialization
> configuration and tools of the environment should be used when preparing
> the job for submission. Am I missing something?
>
> Thanks,
> Nick
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
> at
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
> at ImportFlow.assembleImportFlow(ImportFlow.java:111)
> at ImportFlow.main(ImportFlow.java:178)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
> ... 6 more
> Caused by: java.io.NotSerializableException:
> org.apache.phoenix.util.ColumnInfo
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.ArrayList.writeObject(ArrayList.java:762)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
> ... 17 more
>


RE: Question about DataStream serialization

2015-12-08 Thread Radu Tudoran
Hi,

I attached below a function that shows the issue and that operatorstate does 
not have the initialized value from the open function before the flatmap is 
called. You can see this as the print will not show anything. If you remove on 
the other hand the initialization loop and put a non zero value for the dataset 
flag than the print will work.



public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment
.getExecutionEnvironment();

DataStream stream = env
.socketTextStream("localhost", 16333, '\n')
.map(new MapFunction() {
@Override
public Tuple1 map(String arg0) 
throws Exception {
return new Tuple1(arg0);
}
}).keyBy(0)
.flatMap(new 
RichFlatMapFunction() {

private OperatorState dataset;

@Override
public void flatMap(Tuple1 arg0,
Collector arg1) 
throws Exception {

if (dataset.value() > 0)
arg1.collect("Test OK " 
+ arg0);



}

@Override
public void open(Configuration 
parameters) throws Exception {

dataset = 
getRuntimeContext().getKeyValueState(
"loadeddata", 
Integer.class, 0);


 /*
  * Simulate loading data
  * Looks like if this part is  
commented out and the dataset is 
  * initialize with 1 for 
example, than the non-zero value is available 
  * in the flatMap function  
  */
  
  for(int i=0;i<10;i++) {
  
dataset.update(dataset.value()+1);
  }
  
  //System.out.println("dataset 
value "+dataset.value());
  
}
});

stream.print();

env.execute("test open function");
}
 

Dr. Radu Tudoran
Research Engineer
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Tuesday, December 08, 2015 12:14 PM
To: user@flink.apache.org
Subject: Re: Question about DataStream serialization

Hi,
if the open() method is indeed not called before the first flatMap() call then 
this would be a bug. Could you please verify that this is the case and maybe 
provide an example where this is observable?

Cheers,
Aljoscha
> On 08 Dec 2015, at 10:41, Matthias J. Sax  wrote:
> 
> Hi,
> 
> I think (but please someone verify) that an OperatorState is actually 
> not required -- I think that "open()" is called after a failure and 
> 

Re: Strange behaviour of windows

2015-12-08 Thread Dawid Wysakowicz
Thanks for the explanation. That was really stupid mistake from my side. By
the way, I really like the whole idea and API. Really good job!

Regards
Dawid

2015-12-08 12:30 GMT+01:00 Aljoscha Krettek :

> Hi,
> an important concept of the Flink API is that transformations do not
> modify the original stream (or dataset) but return a new stream with the
> modifications in place. In your example the result of the
> extractTimestamps() call should be used for further processing. I attached
> your source code with the required modifications.
>
> Other than that, I think you understood the watermarks quite well. :D
>
> Let us know if you need more information.
>
> Cheers,
> Aljoscha
>
>
> > On 07 Dec 2015, at 20:34, Dawid Wysakowicz 
> wrote:
> >
> > Forgot to mention. I've checked it both on 0.10 and current master.
> >
> > 2015-12-07 20:32 GMT+01:00 Dawid Wysakowicz  >:
> > Hi,
> >
> > I have recently experimented a bit with windowing and event-time
> mechanism in flink and either I do not understand how should it work or
> there is some kind of a bug.
> >
> > I have prepared two Source Functions. One that emits watermark itself
> and one that does not, but I have prepared a TimestampExtractor that should
> produce same results that the previous Source Function, at least from my
> point of view.
> >
> > Afterwards I've prepared a simple summing over an EventTimeTriggered
> Sliding Window.
> >
> > What I expected is a sum of 3*(t_sum) property of Event regardless of
> the sleep time in Source Function. That is how the EventTimeSourceFunction
> works, but for the SourceFunction it depends on the sleep and does not
> equals 3*(t_sum).
> >
> > I have done some debugging and for the SourceFunction the output of
> ExtractTimestampsOperator does not chain to the aggregator operator(the
> property output.allOutputs is empty).
> >
> > Do I understand the mechanism correctly and should my code work as I
> described? If not could you please explain a little bit? The code I've
> attached to this email.
> >
> > I would be grateful.
> >
> > Regards
> > Dawid Wysakowicz
> >
> >
> >
>
>
>


RE: Question about DataStream serialization

2015-12-08 Thread Radu Tudoran
Hi,

The state that is being loaded can very well be partitioned by keys. Assuming 
this scenario and that you would now that the keys go from 0 to N, is there 
some possibility to load and partitioned the initial data in the open function?


Dr. Radu Tudoran
Research Engineer
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Tuesday, December 08, 2015 4:20 PM
To: user@flink.apache.org
Subject: Re: Question about DataStream serialization

Ah, I see what’s the problem. Operator state is scoped to the key of the 
incoming element. In the open() method, no element has been received yet, so 
the key of the incoming element is basically NULL. So the open() method 
initializes state for key NULL. In flatMap() we actually have a key of incoming 
elements so we access state for a specific key, which has default value “0” 
(from the getKeyValueState() call).

OperatorState is only useful if the state needs to be partitioned by key, but 
here it seems that the state is valid for all elements?
> On 08 Dec 2015, at 15:30, Radu Tudoran  wrote:
> 
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>   .getExecutionEnvironment();
> 
>   DataStream stream = env
>   .socketTextStream("localhost", 16333, '\n')
>   .map(new MapFunction() {
>   @Override
>   public Tuple1 map(String arg0) 
> throws Exception {
>   return new Tuple1(arg0);
>   }
>   }).keyBy(0)
>   .flatMap(new 
> RichFlatMapFunction() {
> 
>   private OperatorState dataset;
> 
>   @Override
>   public void flatMap(Tuple1 arg0,
>   Collector arg1) 
> throws Exception {
> 
>   if (dataset.value() > 0)
>   arg1.collect("Test OK " 
> + arg0);
> 
>   
>   
>   }
> 
>   @Override
>   public void open(Configuration 
> parameters) throws Exception {
> 
>   dataset = 
> getRuntimeContext().getKeyValueState(
>   "loadeddata", 
> Integer.class, 0);
> 
>   
>/*
> * Simulate loading data
> * Looks like if this part is  
> commented out and the dataset is 
> * initialize with 1 for 
> example, than the non-zero value is available 
> * in the flatMap function  
> */
> 
> for(int i=0;i<10;i++) {
> 
> dataset.update(dataset.value()+1);
> }
> 
> //System.out.println("dataset 
> value "+dataset.value());
> 
>   }
>   });
> 
>   stream.print();
> 
>   env.execute("test 

Re: Question about DataStream serialization

2015-12-08 Thread Aljoscha Krettek
Hi,
it is not possible in an officially supported way. There is however a trick 
that you could use: You can cast the OperatorState to a KvState. This has a 
method setCurrentKey() that sets the key to be used when calling value() and 
update(). In this way you can trick the OperatorState into thinking that it has 
the key of an input element.

This is an internal API, however, and could change in the future, thereby 
breaking your program.

Cheers,
Aljoscha
> On 08 Dec 2015, at 16:31, Radu Tudoran  wrote:
> 
> Hi,
> 
> The state that is being loaded can very well be partitioned by keys. Assuming 
> this scenario and that you would now that the keys go from 0 to N, is there 
> some possibility to load and partitioned the initial data in the open 
> function?
> 
> 
> Dr. Radu Tudoran
> Research Engineer
> IT R Division
> 
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
> 
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
> 
> 
> -Original Message-
> From: Aljoscha Krettek [mailto:aljos...@apache.org] 
> Sent: Tuesday, December 08, 2015 4:20 PM
> To: user@flink.apache.org
> Subject: Re: Question about DataStream serialization
> 
> Ah, I see what’s the problem. Operator state is scoped to the key of the 
> incoming element. In the open() method, no element has been received yet, so 
> the key of the incoming element is basically NULL. So the open() method 
> initializes state for key NULL. In flatMap() we actually have a key of 
> incoming elements so we access state for a specific key, which has default 
> value “0” (from the getKeyValueState() call).
> 
> OperatorState is only useful if the state needs to be partitioned by key, but 
> here it seems that the state is valid for all elements?
>> On 08 Dec 2015, at 15:30, Radu Tudoran  wrote:
>> 
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>>  .getExecutionEnvironment();
>> 
>>  DataStream stream = env
>>  .socketTextStream("localhost", 16333, '\n')
>>  .map(new MapFunction() {
>>  @Override
>>  public Tuple1 map(String arg0) 
>> throws Exception {
>>  return new Tuple1(arg0);
>>  }
>>  }).keyBy(0)
>>  .flatMap(new 
>> RichFlatMapFunction() {
>> 
>>  private OperatorState dataset;
>> 
>>  @Override
>>  public void flatMap(Tuple1 arg0,
>>  Collector arg1) 
>> throws Exception {
>> 
>>  if (dataset.value() > 0)
>>  arg1.collect("Test OK " 
>> + arg0);
>> 
>>  
>>  
>>  }
>> 
>>  @Override
>>  public void open(Configuration 
>> parameters) throws Exception {
>> 
>>  dataset = 
>> getRuntimeContext().getKeyValueState(
>>  "loadeddata", 
>> Integer.class, 0);
>> 
>>  
>>   /*
>>* Simulate loading data
>>* Looks like if this part is  
>> commented out and the dataset is 
>>* initialize with 1 for 
>> example, than the non-zero value is available 
>>* in the flatMap function  
>>  

Using memory logging in Flink

2015-12-08 Thread Filip Łęczycki
Hi,

I am trying to enable logging of memory usage on flink 0.10.0 by adding:

taskmanager.debug.memory.startLogThread: true

to config.yaml and setting log4j level to DEBUG however in the logs
after running the job I cannot see any info regarding memory usage.My
job lasted 30s so it should catch few intervals. Should I change
something else in the configuration?


Best regards/Pozdrawiam,

Filip Łęczycki


Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Nick Dimiduk
The point is to provide a means for user to work around nonconforming APIs.
Kryo at least is extensible in that you can register additional serializers.

On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen  wrote:

> Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous
> classes.
> I have been using Java 8 lambdas quite a bit with Flink.
>
> The important thing is that no non-serializable objects are in the closure.
>
> As Fabian mentioned, lazy initialization helps. Serializability is also
> discussed here:
> http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
>
> Adding another serialization framework may help for cases where simply the
> java.io.Serializable interface is missing in an object. However, Not
> everything is magically serializable with Kryo.
> There are classes that you can serialize with Java Serialization, but not
> out of the box with Kryo (especially when immutable collections are
> involved). Also classes that have no default constructors, but have checks
> on invariants, etc can fail with Kryo arbitrarily.
>
>
>
> On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk  wrote:
>
>> Ah, very good. I've closed my issue as a duplicate. Thanks for the
>> reference.
>>
>> On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske  wrote:
>>
>>> Hi Nick,
>>>
>>> thanks for pushing this and opening the JIRA issue.
>>>
>>> The issue came up a couple of times and a known limitation (see
>>> FLINK-1256).
>>> So far the workaround of marking member variables as transient and
>>> initializing them in the open() method of a RichFunction has been good
>>> enough for all cases I am aware of. That's probably why the issue hasn't
>>> been addressed yet.
>>>
>>> Of course this is not a satisfying solution, if you would like to use
>>> Java 8 lambda functions.
>>>
>>> Best, Fabian
>>>
>>> 2015-12-08 19:38 GMT+01:00 Nick Dimiduk :
>>>
 That's what I feared. IMO this is very limiting when mixing in other
 projects where a user does not have control over those projects' APIs. At
 least falling back to an extensible serialization mechanism (like Kryo)
 allows users to register serializers external to the types they're
 consuming.

 I opened https://issues.apache.org/jira/browse/FLINK-3148 for this
 issue.

 -n

 On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann 
 wrote:

> Hi Nick,
>
> at the moment Flink uses Java serialization to ship the UDFs to the
> cluster. Therefore, the closures must only contain Serializable
> objects. The serializer registration only applies to the data which is
> processed by the Flink job. Thus, for the moment I would try to get rid of
> the ColumnInfo object in your closure.
>
> Cheers,
> Till
> ​
>
> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk 
> wrote:
>
>> Hello,
>>
>> I've implemented a (streaming) flow using the Java API and Java8
>> Lambdas for various map functions. When I try to run the flow, job
>> submission fails because of an unserializable type. This is not a type of
>> data used within the flow, but rather a small collection of objects
>> captured in the closure context over one of my Lambdas. I've implemented
>> and registered a Kryo Serializer for this type with this environment,
>> however, it's apparently not used when serializing the lambdas. Seems 
>> like
>> the same serialization configuration and tools of the environment should 
>> be
>> used when preparing the job for submission. Am I missing something?
>>
>> Thanks,
>> Nick
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>> at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>> at
>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>> at
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>> Caused by: org.apache.flink.api.common.InvalidProgramException:
>> Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>> at
>> 

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Nick Dimiduk
That's what I feared. IMO this is very limiting when mixing in other
projects where a user does not have control over those projects' APIs. At
least falling back to an extensible serialization mechanism (like Kryo)
allows users to register serializers external to the types they're
consuming.

I opened https://issues.apache.org/jira/browse/FLINK-3148 for this issue.

-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann  wrote:

> Hi Nick,
>
> at the moment Flink uses Java serialization to ship the UDFs to the
> cluster. Therefore, the closures must only contain Serializable objects.
> The serializer registration only applies to the data which is processed by
> the Flink job. Thus, for the moment I would try to get rid of the
> ColumnInfo object in your closure.
>
> Cheers,
> Till
> ​
>
> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk  wrote:
>
>> Hello,
>>
>> I've implemented a (streaming) flow using the Java API and Java8 Lambdas
>> for various map functions. When I try to run the flow, job submission fails
>> because of an unserializable type. This is not a type of data used within
>> the flow, but rather a small collection of objects captured in the closure
>> context over one of my Lambdas. I've implemented and registered a Kryo
>> Serializer for this type with this environment, however, it's apparently
>> not used when serializing the lambdas. Seems like the same serialization
>> configuration and tools of the environment should be used when preparing
>> the job for submission. Am I missing something?
>>
>> Thanks,
>> Nick
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>> at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
>> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>> at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>> at ImportFlow.main(ImportFlow.java:178)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>> ... 6 more
>> Caused by: java.io.NotSerializableException:
>> org.apache.phoenix.util.ColumnInfo
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at java.util.ArrayList.writeObject(ArrayList.java:762)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> 

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Fabian Hueske
Hi Nick,

thanks for pushing this and opening the JIRA issue.

The issue came up a couple of times and a known limitation (see FLINK-1256).
So far the workaround of marking member variables as transient and
initializing them in the open() method of a RichFunction has been good
enough for all cases I am aware of. That's probably why the issue hasn't
been addressed yet.

Of course this is not a satisfying solution, if you would like to use Java
8 lambda functions.

Best, Fabian

2015-12-08 19:38 GMT+01:00 Nick Dimiduk :

> That's what I feared. IMO this is very limiting when mixing in other
> projects where a user does not have control over those projects' APIs. At
> least falling back to an extensible serialization mechanism (like Kryo)
> allows users to register serializers external to the types they're
> consuming.
>
> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this issue.
>
> -n
>
> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann 
> wrote:
>
>> Hi Nick,
>>
>> at the moment Flink uses Java serialization to ship the UDFs to the
>> cluster. Therefore, the closures must only contain Serializable objects.
>> The serializer registration only applies to the data which is processed by
>> the Flink job. Thus, for the moment I would try to get rid of the
>> ColumnInfo object in your closure.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk  wrote:
>>
>>> Hello,
>>>
>>> I've implemented a (streaming) flow using the Java API and Java8 Lambdas
>>> for various map functions. When I try to run the flow, job submission fails
>>> because of an unserializable type. This is not a type of data used within
>>> the flow, but rather a small collection of objects captured in the closure
>>> context over one of my Lambdas. I've implemented and registered a Kryo
>>> Serializer for this type with this environment, however, it's apparently
>>> not used when serializing the lambdas. Seems like the same serialization
>>> configuration and tools of the environment should be used when preparing
>>> the job for submission. Am I missing something?
>>>
>>> Thanks,
>>> Nick
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>>> at
>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>>> at
>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
>>> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>>> at
>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>>> at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>>> at
>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>>> at
>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>>> at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>>> at ImportFlow.main(ImportFlow.java:178)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>>> ... 6 more
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.phoenix.util.ColumnInfo
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at java.util.ArrayList.writeObject(ArrayList.java:762)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>> at
>>> 

Re: Using memory logging in Flink

2015-12-08 Thread Stephan Ewen
Hi!

That is exactly the right way to do it. Logging has to be at least INFO and
the parameter "taskmanager.debug.memory.startLogThread" set to true.
The log output should be under
"org.apache.flink.runtime.taskmanager.TaskManager".

Do you see other outputs for that class in the log?

Make sure you restarted the TaskManager processes after you changed the
config file.

Greetings,
Stephan



On Tue, Dec 8, 2015 at 6:56 PM, Filip Łęczycki 
wrote:

> Hi,
>
> I am trying to enable logging of memory usage on flink 0.10.0 by adding:
>
> taskmanager.debug.memory.startLogThread: true
>
> to config.yaml and setting log4j level to DEBUG however in the logs after 
> running the job I cannot see any info regarding memory usage.My job lasted 
> 30s so it should catch few intervals. Should I change something else in the 
> configuration?
>
>
> Best regards/Pozdrawiam,
>
> Filip Łęczycki
>


Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Stephan Ewen
That is true.

If you want to look into this, there are probably two places that need
adjustment:

1) The UserCodeObjectWrapper would need to be adjusted to hold a serialized
object (a byte[]) for shipping and serialize that object differently (say
trying Java, then falling back to Kryo).

2) The ClosureCleaner that also check serializability should then no longer
check eagerly for serializability.

After that, we could think about exposing a way to register custom
serializers with Kryo for the UserCodeObjectWrapper.

Greetings,
Stephan


On Tue, Dec 8, 2015 at 8:44 PM, Nick Dimiduk  wrote:

> The point is to provide a means for user to work around nonconforming
> APIs. Kryo at least is extensible in that you can register additional
> serializers.
>
> On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen  wrote:
>
>> Actually, this should be independent of Java 8 lambdas vs Java 7
>> anonymous classes.
>> I have been using Java 8 lambdas quite a bit with Flink.
>>
>> The important thing is that no non-serializable objects are in the
>> closure.
>>
>> As Fabian mentioned, lazy initialization helps. Serializability is also
>> discussed here:
>> http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
>>
>> Adding another serialization framework may help for cases where simply
>> the java.io.Serializable interface is missing in an object. However, Not
>> everything is magically serializable with Kryo.
>> There are classes that you can serialize with Java Serialization, but not
>> out of the box with Kryo (especially when immutable collections are
>> involved). Also classes that have no default constructors, but have checks
>> on invariants, etc can fail with Kryo arbitrarily.
>>
>>
>>
>> On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk  wrote:
>>
>>> Ah, very good. I've closed my issue as a duplicate. Thanks for the
>>> reference.
>>>
>>> On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Nick,

 thanks for pushing this and opening the JIRA issue.

 The issue came up a couple of times and a known limitation (see
 FLINK-1256).
 So far the workaround of marking member variables as transient and
 initializing them in the open() method of a RichFunction has been good
 enough for all cases I am aware of. That's probably why the issue hasn't
 been addressed yet.

 Of course this is not a satisfying solution, if you would like to use
 Java 8 lambda functions.

 Best, Fabian

 2015-12-08 19:38 GMT+01:00 Nick Dimiduk :

> That's what I feared. IMO this is very limiting when mixing in other
> projects where a user does not have control over those projects' APIs. At
> least falling back to an extensible serialization mechanism (like Kryo)
> allows users to register serializers external to the types they're
> consuming.
>
> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this
> issue.
>
> -n
>
> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann 
> wrote:
>
>> Hi Nick,
>>
>> at the moment Flink uses Java serialization to ship the UDFs to the
>> cluster. Therefore, the closures must only contain Serializable
>> objects. The serializer registration only applies to the data which is
>> processed by the Flink job. Thus, for the moment I would try to get rid 
>> of
>> the ColumnInfo object in your closure.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk 
>> wrote:
>>
>>> Hello,
>>>
>>> I've implemented a (streaming) flow using the Java API and Java8
>>> Lambdas for various map functions. When I try to run the flow, job
>>> submission fails because of an unserializable type. This is not a type 
>>> of
>>> data used within the flow, but rather a small collection of objects
>>> captured in the closure context over one of my Lambdas. I've implemented
>>> and registered a Kryo Serializer for this type with this environment,
>>> however, it's apparently not used when serializing the lambdas. Seems 
>>> like
>>> the same serialization configuration and tools of the environment 
>>> should be
>>> used when preparing the job for submission. Am I missing something?
>>>
>>> Thanks,
>>> Nick
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>>> at
>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>>> at

Re: Using memory logging in Flink

2015-12-08 Thread Filip Łęczycki
Hi,

Thank you for your reply!

I have made sure I restarted the TaskManager after changing config, but it
didn't resolve the issue.The config is loaded as I can see the following
line in the log:
09:12:2015 00:00:21,894 DEBUG
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.debug.memory.startLogThread, true

I am running a job on local standalone flink instance and my
log4j.properties look like this:
log4j.rootLogger=DEBUG, file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{dd:MM:YYY HH:mm:ss,SSS}
%-5p %-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

Maybe I can set some property in the job's code so that it would force such
verbose logging?  I need those logs to run some flink performance tests but
maybe I can somehow extract the benchmark results without them,  do you
know any other way to monitor Flink Job's memory usage and GC time, other
than looking at web interface?

Best regards,
Filip Łęczycki

Pozdrawiam,
Filip Łęczycki

2015-12-08 20:48 GMT+01:00 Stephan Ewen :

> Hi!
>
> That is exactly the right way to do it. Logging has to be at least INFO
> and the parameter "taskmanager.debug.memory.startLogThread" set to true.
> The log output should be under
> "org.apache.flink.runtime.taskmanager.TaskManager".
>
> Do you see other outputs for that class in the log?
>
> Make sure you restarted the TaskManager processes after you changed the
> config file.
>
> Greetings,
> Stephan
>
>
>
> On Tue, Dec 8, 2015 at 6:56 PM, Filip Łęczycki 
> wrote:
>
>> Hi,
>>
>> I am trying to enable logging of memory usage on flink 0.10.0 by adding:
>>
>> taskmanager.debug.memory.startLogThread: true
>>
>> to config.yaml and setting log4j level to DEBUG however in the logs after 
>> running the job I cannot see any info regarding memory usage.My job lasted 
>> 30s so it should catch few intervals. Should I change something else in the 
>> configuration?
>>
>>
>> Best regards/Pozdrawiam,
>>
>> Filip Łęczycki
>>
>
>


RE: Question about DataStream serialization

2015-12-08 Thread Radu Tudoran
Hi,

Is the partitioned functioned used by the ".keyBy(Object)" of the form:

Object.hash % getNumberOfParallelSubtasks()

?



Dr. Radu Tudoran
Research Engineer
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Tuesday, December 08, 2015 5:00 PM
To: user@flink.apache.org
Subject: Re: Question about DataStream serialization

Hi,
it is not possible in an officially supported way. There is however a trick 
that you could use: You can cast the OperatorState to a KvState. This has a 
method setCurrentKey() that sets the key to be used when calling value() and 
update(). In this way you can trick the OperatorState into thinking that it has 
the key of an input element.

This is an internal API, however, and could change in the future, thereby 
breaking your program.

Cheers,
Aljoscha
> On 08 Dec 2015, at 16:31, Radu Tudoran  wrote:
> 
> Hi,
> 
> The state that is being loaded can very well be partitioned by keys. Assuming 
> this scenario and that you would now that the keys go from 0 to N, is there 
> some possibility to load and partitioned the initial data in the open 
> function?
> 
> 
> Dr. Radu Tudoran
> Research Engineer
> IT R Division
> 
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
> 
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered 
> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing 
> Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der 
> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail 
> and its attachments contain confidential information from HUAWEI, which is 
> intended only for the person or entity whose address is listed above. Any use 
> of the information contained herein in any way (including, but not limited 
> to, total or partial disclosure, reproduction, or dissemination) by persons 
> other than the intended recipient(s) is prohibited. If you receive this 
> e-mail in error, please notify the sender by phone or email immediately and 
> delete it!
> 
> 
> -Original Message-
> From: Aljoscha Krettek [mailto:aljos...@apache.org]
> Sent: Tuesday, December 08, 2015 4:20 PM
> To: user@flink.apache.org
> Subject: Re: Question about DataStream serialization
> 
> Ah, I see what’s the problem. Operator state is scoped to the key of the 
> incoming element. In the open() method, no element has been received yet, so 
> the key of the incoming element is basically NULL. So the open() method 
> initializes state for key NULL. In flatMap() we actually have a key of 
> incoming elements so we access state for a specific key, which has default 
> value “0” (from the getKeyValueState() call).
> 
> OperatorState is only useful if the state needs to be partitioned by key, but 
> here it seems that the state is valid for all elements?
>> On 08 Dec 2015, at 15:30, Radu Tudoran  wrote:
>> 
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>>  .getExecutionEnvironment();
>> 
>>  DataStream stream = env
>>  .socketTextStream("localhost", 16333, '\n')
>>  .map(new MapFunction() {
>>  @Override
>>  public Tuple1 map(String arg0) 
>> throws Exception {
>>  return new Tuple1(arg0);
>>  }
>>  }).keyBy(0)
>>  .flatMap(new 
>> RichFlatMapFunction() {
>> 
>>  private OperatorState dataset;
>> 
>>