Eclipse JDT, Java 8, lambdas

2015-02-06 Thread Nam-Luc Tran
Hello,

I am trying to use Java 8 lambdas in my project and hit the following
error:

Exception in thread main
org.apache.flink.api.common.functions.InvalidTypesException: The
generic type parameters of 'Tuple2' are missing. 
It seems that your compiler has not stored them into the .class
file. 
Currently, only the Eclipse JDT compiler preserves the type
information necessary to use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs
containing lambda expressions.
at
org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:779)
at
org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:765)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:135)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:78)
at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
at eu.euranova.flink.Axa.main(Axa.java:62)

My very simple code is the following:

File directory = new File(
PATH TO A DIRECTORY WITH CSV FILES);
DataSet set = env.fromElements(new Tuple3(0, 0.0, 0.0));
for (File file : directory.listFiles()) {
int pathID = 0;
String filePath = file:// + file.getAbsolutePath();
DataSet set2 =
env.readCsvFile(filePath).ignoreFirstLine().includeFields(11).types(Double.class,Double.class);
DataSet set3 = set2.map(tuple - new Tuple3(pathID, tuple.f0,
tuple.f1));
set = set.union(set3);
}

I followed the steps in the Java 8 documentation section
(http://flink.apache.org/docs/0.8/java8_programming_guide.html#compiler-limitations)
and have applied the following to the pom.xml file created using the
flink archetype:
- Modified java 1.6 reference to 1.8
- Uncommented the section related to Java 8 lambdas
- Installed Eclipse Java developer tools (JDT)
- Installed m2e-jdt connector

The pom.xml does not show any error and builds fine.

Am I missing something? Do I need to explicity set up Eclipse JDT? The
only installed environment shown in my preferences is the
/usr/java/jdk-1.8.0_31 from oracle. 

Thanks and best regards,

Tran Nam-Luc


Re: Planning Release 0.8.1

2015-02-06 Thread Aljoscha Krettek
I have a fix for this user-discovered bug:
https://issues.apache.org/jira/browse/FLINK-1463?jql=project%20%3D%20FLINK%20AND%20assignee%20%3D%20currentUser()%20AND%20resolution%20%3D%20Unresolved

in this PR: https://github.com/apache/flink/pull/353

This should probably also be back-ported to 0.8.1

On Thu, Feb 5, 2015 at 3:54 PM, Aljoscha Krettek aljos...@apache.org wrote:
 I have the PR ready. :D


 On Thu, Feb 5, 2015 at 2:43 PM, Stephan Ewen se...@apache.org wrote:
 I would like to add it to 0.8.1. People have asked for it...

 On Thu, Feb 5, 2015 at 2:36 PM, Aljoscha Krettek aljos...@apache.org
 wrote:

 Do we want to have HadoopInputFormat support for Scala in 0.8.1?

 On Thu, Feb 5, 2015 at 2:29 PM, Stephan Ewen se...@apache.org wrote:
  I think we need to make a pass through the recent 0.9 commits and cherry
  pick some more into 0.8.1. There were quite a few bug fixes.
 
  Also, this one is rather critical and pending:
  https://github.com/apache/flink/pull/318
 
  On Thu, Feb 5, 2015 at 2:27 PM, Robert Metzger rmetz...@apache.org
 wrote:
 
  Hi guys,
 
  I would like to bundle a minor bugfix release for Flink soon.
  Some users were complaining about incomplete Kryo support, in particular
  for Avro.
 
  Also, we fixed some other issues which are easy to to port to 0.8.1
 (some
  of them are already in the branch).
 
  I would like to start the vote on the 0.8.1 release in roughly 26 hours.
  So please merge bugfixes you would like to have in 0.8.1 into the
  release-0.8 branch in the next 24 hours.
 
  There are currently 2 open JIRAs assigned to 0.8.1, one with a pending
 pull
  request:
 
 
 https://issues.apache.org/jira/browse/FLINK-1422?jql=project%20%3D%20FLINK%20AND%20status%20%3D%20Open%20AND%20fixVersion%20%3D%200.8.1%20ORDER%20BY%20status%20DESC%2C%20priority%20DESC
 
 
  Please chime in if you are suggesting a different plan for the 0.8.1
  release.
 
 
  - Robert
 



Re: Eclipse JDT, Java 8, lambdas

2015-02-06 Thread Robert Metzger
Hi,

looking at your code, it seems that you are creating a DataSet for each
file in the directory. Flink can also read entire directories.

Now regarding the actual problem:
How are you starting the Flink job?
Out of your IDE, or using the ./bin/flink run tool?


Best,
Robert


On Fri, Feb 6, 2015 at 12:27 PM, Nam-Luc Tran namluc.t...@euranova.eu
wrote:

 Hello,

 I am trying to use Java 8 lambdas in my project and hit the following
 error:

 Exception in thread main
 org.apache.flink.api.common.functions.InvalidTypesException: The
 generic type parameters of 'Tuple2' are missing.
 It seems that your compiler has not stored them into the .class
 file.
 Currently, only the Eclipse JDT compiler preserves the type
 information necessary to use the lambdas feature type-safely.
 See the documentation for more information about how to compile jobs
 containing lambda expressions.
 at

 org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:779)
 at

 org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:765)
 at

 org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:135)
 at

 org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:78)
 at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
 at eu.euranova.flink.Axa.main(Axa.java:62)

 My very simple code is the following:

 File directory = new File(
 PATH TO A DIRECTORY WITH CSV FILES);
 DataSet set = env.fromElements(new Tuple3(0, 0.0, 0.0));
 for (File file : directory.listFiles()) {
 int pathID = 0;
 String filePath = file:// + file.getAbsolutePath();
 DataSet set2 =

 env.readCsvFile(filePath).ignoreFirstLine().includeFields(11).types(Double.class,Double.class);
 DataSet set3 = set2.map(tuple - new Tuple3(pathID, tuple.f0,
 tuple.f1));
 set = set.union(set3);
 }

 I followed the steps in the Java 8 documentation section
 (
 http://flink.apache.org/docs/0.8/java8_programming_guide.html#compiler-limitations
 )
 and have applied the following to the pom.xml file created using the
 flink archetype:
 - Modified java 1.6 reference to 1.8
 - Uncommented the section related to Java 8 lambdas
 - Installed Eclipse Java developer tools (JDT)
 - Installed m2e-jdt connector

 The pom.xml does not show any error and builds fine.

 Am I missing something? Do I need to explicity set up Eclipse JDT? The
 only installed environment shown in my preferences is the
 /usr/java/jdk-1.8.0_31 from oracle.

 Thanks and best regards,

 Tran Nam-Luc



Re: Eclipse JDT, Java 8, lambdas

2015-02-06 Thread Robert Metzger
Sorry, I didn't see the pathID which is added in the map() method.
Then your approach looks good for using Flink locally.

On Fri, Feb 6, 2015 at 3:03 PM, Nam-Luc Tran namluc.t...@euranova.eu
wrote:

 Thank you for your replies.

 @Stephen
 Updating to 0.9-SNAPSHOT and using the return statement did the trick. I
 will try the 4.5 M4 release and give a feedback on how it went.

 @Robert
 I launch the job right from the Eclipse IDE.
 Also, each file in the folder contains data for a different trajectory. By
 iterating this way I create a different dataset for each trajectory and add
 a tag for identifying data of a same trajectory. I then union all these
 datasets together in a same dataset. Do you see a more streamlined way to
 achieve this?

 Best regards,

 Tran Nam-Luc




 --
 View this message in context:
 http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Eclipse-JDT-Java-8-lambdas-tp3664p3675.html
 Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
 archive at Nabble.com.



Re: Planning Release 0.8.1

2015-02-06 Thread Robert Metzger
@Aljoscha, can you merge the backported fix to the release-0.8 branch
when its ready?

On Fri, Feb 6, 2015 at 11:39 AM, Aljoscha Krettek aljos...@apache.org
wrote:

 I have a fix for this user-discovered bug:

 https://issues.apache.org/jira/browse/FLINK-1463?jql=project%20%3D%20FLINK%20AND%20assignee%20%3D%20currentUser()%20AND%20resolution%20%3D%20Unresolved

 in this PR: https://github.com/apache/flink/pull/353

 This should probably also be back-ported to 0.8.1

 On Thu, Feb 5, 2015 at 3:54 PM, Aljoscha Krettek aljos...@apache.org
 wrote:
  I have the PR ready. :D
 
 
  On Thu, Feb 5, 2015 at 2:43 PM, Stephan Ewen se...@apache.org wrote:
  I would like to add it to 0.8.1. People have asked for it...
 
  On Thu, Feb 5, 2015 at 2:36 PM, Aljoscha Krettek aljos...@apache.org
  wrote:
 
  Do we want to have HadoopInputFormat support for Scala in 0.8.1?
 
  On Thu, Feb 5, 2015 at 2:29 PM, Stephan Ewen se...@apache.org wrote:
   I think we need to make a pass through the recent 0.9 commits and
 cherry
   pick some more into 0.8.1. There were quite a few bug fixes.
  
   Also, this one is rather critical and pending:
   https://github.com/apache/flink/pull/318
  
   On Thu, Feb 5, 2015 at 2:27 PM, Robert Metzger rmetz...@apache.org
  wrote:
  
   Hi guys,
  
   I would like to bundle a minor bugfix release for Flink soon.
   Some users were complaining about incomplete Kryo support, in
 particular
   for Avro.
  
   Also, we fixed some other issues which are easy to to port to 0.8.1
  (some
   of them are already in the branch).
  
   I would like to start the vote on the 0.8.1 release in roughly 26
 hours.
   So please merge bugfixes you would like to have in 0.8.1 into the
   release-0.8 branch in the next 24 hours.
  
   There are currently 2 open JIRAs assigned to 0.8.1, one with a
 pending
  pull
   request:
  
  
 
 https://issues.apache.org/jira/browse/FLINK-1422?jql=project%20%3D%20FLINK%20AND%20status%20%3D%20Open%20AND%20fixVersion%20%3D%200.8.1%20ORDER%20BY%20status%20DESC%2C%20priority%20DESC
  
  
   Please chime in if you are suggesting a different plan for the 0.8.1
   release.
  
  
   - Robert
  
 



Streaming fault tolerance with event sending

2015-02-06 Thread Hermann Gábor
Hey,

We've been implementing a simple Storm-like fault tolerance system
with persisting source records and keeping track of all the records
(whether they've been processed), and replaying them if they fail. The
straightforward way to do this was creating a special AbstractJobVertex
(and AbstractInvokable) that deals with replaying and keeps track of the
records.

So there is a so called FTLayerVertex that needs a two-way
communication with the tasks:

   1. it needs to be notified about a processed record or a newly created
   record
   2. it needs to resend failed records to the appropriate tasks

We've decided to use events for 1. and regular message sending (i.e.
BufferReader/Writer) for 2. So we need to send events backward on a
BufferReader.

The problem is that backward events are only sent and received when
the channel is initialized (in case of one channel for now), there's a
living
connection (i.e. it's not UNKNOWN, please correct me if I'm wrong). But
sometimes the connection is not yet initialized when we're sending these
events from the tasks to the FTLayerVertex.

We've tried a workaround: sending an initializer forward event from FT
to all the tasks, and waiting for this event at the beginning of every
task's
invoke, thus the backward events will only be sent when the channel is
surely up. That did not work out, because in order to receive an event,
we need to read that input, and we only want to read and process that
input after we made sure we can use the channel.

Another workaround were sending an initializer record instead of an
event. That did not work out either, because we wanted to read that
input (from the FTLayerVertex) as a regular input, so the given
BufferReader were also passed to a UnionBufferReader, and reading
the given BufferReader separately (wrapped in a RecordReader)
resulted in the UnionBufferReader getting stuck. (It seemed as the
UnionBufferReader put the given BufferReader in it's queue because of
this initializer message, but the message had been already read from
the BufferReader separately, so it got stuck waiting for a new record
from the BufferReader).

These workarounds don't seem natural either, because we're pulling this
synchronization of connection initialization a layer up. It would be nice
if we had some kind of events that are made sure to get to the
destination. Is that possible? Would that be useful for other purposes too?

Does anyone know some workaround that might work? Are we missing
something? (Please correct me if I see something wrong.) Should we use
something else instead of events? (I think we cannot avoid this two-way
communication between vertices.) All suggestions are welcome!

Best regards,
Gabor