Eclipse JDT, Java 8, lambdas
Hello, I am trying to use Java 8 lambdas in my project and hit the following error: Exception in thread main org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. It seems that your compiler has not stored them into the .class file. Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. See the documentation for more information about how to compile jobs containing lambda expressions. at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:779) at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:765) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:135) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:78) at org.apache.flink.api.java.DataSet.map(DataSet.java:160) at eu.euranova.flink.Axa.main(Axa.java:62) My very simple code is the following: File directory = new File( PATH TO A DIRECTORY WITH CSV FILES); DataSet set = env.fromElements(new Tuple3(0, 0.0, 0.0)); for (File file : directory.listFiles()) { int pathID = 0; String filePath = file:// + file.getAbsolutePath(); DataSet set2 = env.readCsvFile(filePath).ignoreFirstLine().includeFields(11).types(Double.class,Double.class); DataSet set3 = set2.map(tuple - new Tuple3(pathID, tuple.f0, tuple.f1)); set = set.union(set3); } I followed the steps in the Java 8 documentation section (http://flink.apache.org/docs/0.8/java8_programming_guide.html#compiler-limitations) and have applied the following to the pom.xml file created using the flink archetype: - Modified java 1.6 reference to 1.8 - Uncommented the section related to Java 8 lambdas - Installed Eclipse Java developer tools (JDT) - Installed m2e-jdt connector The pom.xml does not show any error and builds fine. Am I missing something? Do I need to explicity set up Eclipse JDT? The only installed environment shown in my preferences is the /usr/java/jdk-1.8.0_31 from oracle. Thanks and best regards, Tran Nam-Luc
Re: Planning Release 0.8.1
I have a fix for this user-discovered bug: https://issues.apache.org/jira/browse/FLINK-1463?jql=project%20%3D%20FLINK%20AND%20assignee%20%3D%20currentUser()%20AND%20resolution%20%3D%20Unresolved in this PR: https://github.com/apache/flink/pull/353 This should probably also be back-ported to 0.8.1 On Thu, Feb 5, 2015 at 3:54 PM, Aljoscha Krettek aljos...@apache.org wrote: I have the PR ready. :D On Thu, Feb 5, 2015 at 2:43 PM, Stephan Ewen se...@apache.org wrote: I would like to add it to 0.8.1. People have asked for it... On Thu, Feb 5, 2015 at 2:36 PM, Aljoscha Krettek aljos...@apache.org wrote: Do we want to have HadoopInputFormat support for Scala in 0.8.1? On Thu, Feb 5, 2015 at 2:29 PM, Stephan Ewen se...@apache.org wrote: I think we need to make a pass through the recent 0.9 commits and cherry pick some more into 0.8.1. There were quite a few bug fixes. Also, this one is rather critical and pending: https://github.com/apache/flink/pull/318 On Thu, Feb 5, 2015 at 2:27 PM, Robert Metzger rmetz...@apache.org wrote: Hi guys, I would like to bundle a minor bugfix release for Flink soon. Some users were complaining about incomplete Kryo support, in particular for Avro. Also, we fixed some other issues which are easy to to port to 0.8.1 (some of them are already in the branch). I would like to start the vote on the 0.8.1 release in roughly 26 hours. So please merge bugfixes you would like to have in 0.8.1 into the release-0.8 branch in the next 24 hours. There are currently 2 open JIRAs assigned to 0.8.1, one with a pending pull request: https://issues.apache.org/jira/browse/FLINK-1422?jql=project%20%3D%20FLINK%20AND%20status%20%3D%20Open%20AND%20fixVersion%20%3D%200.8.1%20ORDER%20BY%20status%20DESC%2C%20priority%20DESC Please chime in if you are suggesting a different plan for the 0.8.1 release. - Robert
Re: Eclipse JDT, Java 8, lambdas
Hi, looking at your code, it seems that you are creating a DataSet for each file in the directory. Flink can also read entire directories. Now regarding the actual problem: How are you starting the Flink job? Out of your IDE, or using the ./bin/flink run tool? Best, Robert On Fri, Feb 6, 2015 at 12:27 PM, Nam-Luc Tran namluc.t...@euranova.eu wrote: Hello, I am trying to use Java 8 lambdas in my project and hit the following error: Exception in thread main org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. It seems that your compiler has not stored them into the .class file. Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. See the documentation for more information about how to compile jobs containing lambda expressions. at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:779) at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:765) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:135) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:78) at org.apache.flink.api.java.DataSet.map(DataSet.java:160) at eu.euranova.flink.Axa.main(Axa.java:62) My very simple code is the following: File directory = new File( PATH TO A DIRECTORY WITH CSV FILES); DataSet set = env.fromElements(new Tuple3(0, 0.0, 0.0)); for (File file : directory.listFiles()) { int pathID = 0; String filePath = file:// + file.getAbsolutePath(); DataSet set2 = env.readCsvFile(filePath).ignoreFirstLine().includeFields(11).types(Double.class,Double.class); DataSet set3 = set2.map(tuple - new Tuple3(pathID, tuple.f0, tuple.f1)); set = set.union(set3); } I followed the steps in the Java 8 documentation section ( http://flink.apache.org/docs/0.8/java8_programming_guide.html#compiler-limitations ) and have applied the following to the pom.xml file created using the flink archetype: - Modified java 1.6 reference to 1.8 - Uncommented the section related to Java 8 lambdas - Installed Eclipse Java developer tools (JDT) - Installed m2e-jdt connector The pom.xml does not show any error and builds fine. Am I missing something? Do I need to explicity set up Eclipse JDT? The only installed environment shown in my preferences is the /usr/java/jdk-1.8.0_31 from oracle. Thanks and best regards, Tran Nam-Luc
Re: Eclipse JDT, Java 8, lambdas
Sorry, I didn't see the pathID which is added in the map() method. Then your approach looks good for using Flink locally. On Fri, Feb 6, 2015 at 3:03 PM, Nam-Luc Tran namluc.t...@euranova.eu wrote: Thank you for your replies. @Stephen Updating to 0.9-SNAPSHOT and using the return statement did the trick. I will try the 4.5 M4 release and give a feedback on how it went. @Robert I launch the job right from the Eclipse IDE. Also, each file in the folder contains data for a different trajectory. By iterating this way I create a different dataset for each trajectory and add a tag for identifying data of a same trajectory. I then union all these datasets together in a same dataset. Do you see a more streamlined way to achieve this? Best regards, Tran Nam-Luc -- View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Eclipse-JDT-Java-8-lambdas-tp3664p3675.html Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.
Re: Planning Release 0.8.1
@Aljoscha, can you merge the backported fix to the release-0.8 branch when its ready? On Fri, Feb 6, 2015 at 11:39 AM, Aljoscha Krettek aljos...@apache.org wrote: I have a fix for this user-discovered bug: https://issues.apache.org/jira/browse/FLINK-1463?jql=project%20%3D%20FLINK%20AND%20assignee%20%3D%20currentUser()%20AND%20resolution%20%3D%20Unresolved in this PR: https://github.com/apache/flink/pull/353 This should probably also be back-ported to 0.8.1 On Thu, Feb 5, 2015 at 3:54 PM, Aljoscha Krettek aljos...@apache.org wrote: I have the PR ready. :D On Thu, Feb 5, 2015 at 2:43 PM, Stephan Ewen se...@apache.org wrote: I would like to add it to 0.8.1. People have asked for it... On Thu, Feb 5, 2015 at 2:36 PM, Aljoscha Krettek aljos...@apache.org wrote: Do we want to have HadoopInputFormat support for Scala in 0.8.1? On Thu, Feb 5, 2015 at 2:29 PM, Stephan Ewen se...@apache.org wrote: I think we need to make a pass through the recent 0.9 commits and cherry pick some more into 0.8.1. There were quite a few bug fixes. Also, this one is rather critical and pending: https://github.com/apache/flink/pull/318 On Thu, Feb 5, 2015 at 2:27 PM, Robert Metzger rmetz...@apache.org wrote: Hi guys, I would like to bundle a minor bugfix release for Flink soon. Some users were complaining about incomplete Kryo support, in particular for Avro. Also, we fixed some other issues which are easy to to port to 0.8.1 (some of them are already in the branch). I would like to start the vote on the 0.8.1 release in roughly 26 hours. So please merge bugfixes you would like to have in 0.8.1 into the release-0.8 branch in the next 24 hours. There are currently 2 open JIRAs assigned to 0.8.1, one with a pending pull request: https://issues.apache.org/jira/browse/FLINK-1422?jql=project%20%3D%20FLINK%20AND%20status%20%3D%20Open%20AND%20fixVersion%20%3D%200.8.1%20ORDER%20BY%20status%20DESC%2C%20priority%20DESC Please chime in if you are suggesting a different plan for the 0.8.1 release. - Robert
Streaming fault tolerance with event sending
Hey, We've been implementing a simple Storm-like fault tolerance system with persisting source records and keeping track of all the records (whether they've been processed), and replaying them if they fail. The straightforward way to do this was creating a special AbstractJobVertex (and AbstractInvokable) that deals with replaying and keeps track of the records. So there is a so called FTLayerVertex that needs a two-way communication with the tasks: 1. it needs to be notified about a processed record or a newly created record 2. it needs to resend failed records to the appropriate tasks We've decided to use events for 1. and regular message sending (i.e. BufferReader/Writer) for 2. So we need to send events backward on a BufferReader. The problem is that backward events are only sent and received when the channel is initialized (in case of one channel for now), there's a living connection (i.e. it's not UNKNOWN, please correct me if I'm wrong). But sometimes the connection is not yet initialized when we're sending these events from the tasks to the FTLayerVertex. We've tried a workaround: sending an initializer forward event from FT to all the tasks, and waiting for this event at the beginning of every task's invoke, thus the backward events will only be sent when the channel is surely up. That did not work out, because in order to receive an event, we need to read that input, and we only want to read and process that input after we made sure we can use the channel. Another workaround were sending an initializer record instead of an event. That did not work out either, because we wanted to read that input (from the FTLayerVertex) as a regular input, so the given BufferReader were also passed to a UnionBufferReader, and reading the given BufferReader separately (wrapped in a RecordReader) resulted in the UnionBufferReader getting stuck. (It seemed as the UnionBufferReader put the given BufferReader in it's queue because of this initializer message, but the message had been already read from the BufferReader separately, so it got stuck waiting for a new record from the BufferReader). These workarounds don't seem natural either, because we're pulling this synchronization of connection initialization a layer up. It would be nice if we had some kind of events that are made sure to get to the destination. Is that possible? Would that be useful for other purposes too? Does anyone know some workaround that might work? Are we missing something? (Please correct me if I see something wrong.) Should we use something else instead of events? (I think we cannot avoid this two-way communication between vertices.) All suggestions are welcome! Best regards, Gabor