Luke Hutchison created FLINK-6026:
-------------------------------------

             Summary: Cannot name flatMap operations
                 Key: FLINK-6026
                 URL: https://issues.apache.org/jira/browse/FLINK-6026
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.2.0
            Reporter: Luke Hutchison
            Priority: Minor


I get an error if I try naming a flatMap operation:

DataSet<Tuple2<String, Integer>> y = x.flatMap((t, out) -> 
out.collect(t)).name("op");

Type mismatch: cannot convert from 
FlatMapOperator<Tuple2<String,Integer>,Object> to 
DataSet<Tuple2<String,Integer>>

If I try to do it as two steps, I get the error that DataSet does not have a 
.name(String) method:

DataSet<Tuple2<String, Integer>> y = x.flatMap((t, out) -> out.collect(t));
y.name("op");

If I use Eclipse type inference on x, it shows me that the output type is not 
correctly inferred:

FlatMapOperator<Tuple2<String, Integer>, Object> y = x.flatMap((t, out) -> 
out.collect(t));
y.name("op");   // This now works, but "Object" is not the output type

However, these steps still cannot be chained -- the following still gives an 
error:

FlatMapOperator<Tuple2<String, Integer>, Object> y = x.flatMap((t, out) -> 
out.collect(t)).name("op");

i.e. first you have to assign the result to a field, so that the type is fully 
specified; then you can name the operation.

And the weird thing is that you can give the correct, more specific type for 
the local variable, without a type narrowing error:

FlatMapOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> y = 
x.flatMap((t, out) -> out.collect(t));
y.name("op");   // This works, although chaining these two lines still does not 
work

If the types of the lambda args are specified, then everything works:

DataSet<Tuple2<String, Integer>> y = x.flatMap((Tuple2<String, Integer> t, 
Collector<Tuple2<String, Integer>> out) -> out.collect(t)).name("op");

So, at least two things are going on here:

(1) type inference is not working correctly for the lambda parameters

(2) this breaks type inference for intermediate expressions, unless the type 
can be resolved using a local variable definition

Is this a bug in the type signature of flatMap? (Or a compiler bug or 
limitation, or a fundamental limitation of Java 8 type inference?)

It seems odd that the type of a local variable definition can make the result 
of the flatMap operator *more* specific, taking the type from 

FlatMapOperator<Tuple2<String, Integer>, Object>

to 

FlatMapOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>

i.e. if the output type is provided in the local variable definition, it is 
properly unified with the type of the parameter t of collect(t), however that 
type is not propagated out of that call.

Can anything be done about this in Flink? I have hit this problem a few times.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to