Re: Custom Class for state checkpointing

2015-08-18 Thread Rico Bergmann
Hi Marton. I think this is more a class loader issue. My custom class implements Serializable and so do all contained members classes. Greets. Rico. Am 18.08.2015 um 11:45 schrieb Márton Balassi balassi.mar...@gmail.com: Hey Rico, Currently the Checkpointed interface has the

Re: 答复: How to understand slot?

2015-08-18 Thread Fabian Hueske
A TM reserves a certain amount of memory for each slot, but CPU and IO can be shared across slots. Hence, there might be some imbalance among TMs, but this imbalance is limited by the concept of slots which gives an upper bound of the number of tasks that a TM can process. Also random assignment

Re: Java 8 and type erasure

2015-08-18 Thread Aljoscha Krettek
Hi Kristoffer, I'm afraid not, but maybe Timo has some further information. In this extended example we can see the problem: https://gist.github.com/aljoscha/84cc363d13cf1dfe9364. The output is: Type is: class org.apache.flink.examples.java8.wordcount.TypeTest$Thing class

Re: Custom Class for state checkpointing

2015-08-18 Thread Márton Balassi
Hey Rico, Currently the Checkpointed interface has the limitation the return type of the snapshotstate method (the generic paramter of Checkpointed) has to be java Serializable. I suspect that is the problem here. This is a limitation that we plan to lift soon. Marton On Tue, Aug 18, 2015 at

Re: How to understand slot?

2015-08-18 Thread Stephan Ewen
Hi! There is a little bit of documentation about the scheduling and the slots - In the config reference: https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-taskmanager-processing-slots - In the internals docs:

Re: Java 8 and type erasure

2015-08-18 Thread Stephan Ewen
Wow, that looks super interesting. Will try that out later. Thanks for sharing :-) On Tue, Aug 18, 2015 at 11:01 AM, Kristoffer Sjögren sto...@gmail.com wrote: Hi Potential fix for writing flink jobs using lamdas without Eclipse JDT?

Re: Java 8 and type erasure

2015-08-18 Thread Kristoffer Sjögren
:-) On Tue, Aug 18, 2015 at 11:03 AM, Stephan Ewen se...@apache.org wrote: Wow, that looks super interesting. Will try that out later. Thanks for sharing :-) On Tue, Aug 18, 2015 at 11:01 AM, Kristoffer Sjögren sto...@gmail.com wrote: Hi Potential fix for writing flink jobs using

Java 8 and type erasure

2015-08-18 Thread Kristoffer Sjögren
Hi Potential fix for writing flink jobs using lamdas without Eclipse JDT? https://gist.github.com/aslakhellesoy/3678beba60c109eacbe5 Cheers, -Kristoffer

Custom Class for state checkpointing

2015-08-18 Thread Rico Bergmann
Hi! Is it possible to use your own class? I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. I tried this and got an exception: Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state. at

Re: Java 8 and type erasure

2015-08-18 Thread Kristoffer Sjögren
How about https://github.com/jhalterman/typetools? On Tue, Aug 18, 2015 at 11:16 AM, Aljoscha Krettek aljos...@apache.org wrote: Hi Kristoffer, I'm afraid not, but maybe Timo has some further information. In this extended example we can see the problem:

Re: Custom Class for state checkpointing

2015-08-18 Thread Robert Metzger
Hi Rico, I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik). We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ? On

How to understand slot?

2015-08-18 Thread Zhangrucong
When I read the schedule code in job manager. I have flowing questions: 1、 How to decide a job vertex to deploy in a shared slot? What is the benefit deploy vertexes in a shared slot? 2、 How to decide a task manager has how many slots? 3、 If there are many task managers, when allocate a

答复: How to understand slot?

2015-08-18 Thread Zhangrucong
Hi stephan, Thanks a lot for answering. 3) For sources, Flink picks a random TaskManager (splits are then assigned locality aware to the sources). For all tasks after sources, Flink tries to co-locate them with their input(s), unless they have so many inputs that co-location makes no

Re: Custom Class for state checkpointing

2015-08-18 Thread Stephan Ewen
Yep, that is a valid bug! State is apparently not resolved with the correct classloader. As a workaround, you can checkpoint byte arrays and serialize/deserialize the state into byte arrays yourself. You can use the apache commons SerializationUtil class, or Flinks InstantiationUtil class for

Re: Java 8 and type erasure

2015-08-18 Thread Stephan Ewen
Hi Kristoffer! I looked through the code as well. In fact, Flink currently uses the trick mentioned for Serializable Lambdas in the gist you sent me. This works well for lambdas that return simple types (primitives or classes without generics). The information for the generic parametrization is

Re: Java 8 and type erasure

2015-08-18 Thread Stephan Ewen
Would have been great. I had high hopes when I saw the trick with the constant pool, but this is only to make what Flink does already applicable to non-serializable lambdas. If you want to help us with this, I'll ping you for some support on the OpenJDK mailing list ;-) On Tue, Aug 18, 2015 at

Re: Java 8 and type erasure

2015-08-18 Thread Stephan Ewen
Timo should still have the patch! If you want to re-vive the thread, that'd be great. I'd be happy to support it. On Tue, Aug 18, 2015 at 2:51 PM, Kristoffer Sjögren sto...@gmail.com wrote: Do you have a link to these patches? Reading through the thread, I get the feeling they didn't

Re: Java 8 and type erasure

2015-08-18 Thread Kristoffer Sjögren
I suspected that you already had looked into this, but it was worth a try. It would make everything so much easier. Thanks for the explanation :-) On Tue, Aug 18, 2015 at 1:50 PM, Stephan Ewen se...@apache.org wrote: Hi Kristoffer! I looked through the code as well. In fact, Flink currently

Re: Java 8 and type erasure

2015-08-18 Thread Kristoffer Sjögren
Yeah, I think I found the thread already... by Timo Walther? On Tue, Aug 18, 2015 at 2:01 PM, Stephan Ewen se...@apache.org wrote: Would have been great. I had high hopes when I saw the trick with the constant pool, but this is only to make what Flink does already applicable to

Re: Custom Class for state checkpointing

2015-08-18 Thread Robert Metzger
I created a JIRA for the issue: https://issues.apache.org/jira/browse/FLINK-2543 Once I'm done with the Kafka pull request, I'll take a look into this. On Tue, Aug 18, 2015 at 1:56 PM, Stephan Ewen se...@apache.org wrote: Yep, that is a valid bug! State is apparently not resolved with the

Re: Java 8 and type erasure

2015-08-18 Thread Kristoffer Sjögren
Do you have a link to these patches? Reading through the thread, I get the feeling they didn't reject the idea completely. Considering there are also other projects (Crunch, Spark, Storm, etc) that would benefit from this, maybe we can convince them together? On Tue, Aug 18, 2015 at 2:27 PM,

Re: Custom Class for state checkpointing

2015-08-18 Thread Rico Bergmann
Hi! Using TupleX is not possible since the state is very big (a Hashtable). How would I have to do serialization into a byte array? Greets. Rico. Am 18.08.2015 um 11:44 schrieb Robert Metzger rmetz...@apache.org: Hi Rico, I'm pretty sure that this is a valid bug you've found, since

Re: Self-join with filter

2015-08-18 Thread Stephan Ewen
Hi! I am not 100% sure that I understand your question completely, but I'll give it my best shot. If you want to push IDs into the connector, I assume you mean that you use some form of connector that can filter by ID directly in the low level data access paths, in order to read as little data

Re: Custom Class for state checkpointing

2015-08-18 Thread Robert Metzger
I'm still working on writing a test case for reproducing the issue. Which Flink version are you using? If you are using 0.10-SNAPSHOT, which exact commit? On Tue, Aug 18, 2015 at 2:09 PM, Robert Metzger rmetz...@apache.org wrote: I created a JIRA for the issue: