Hi Robert,

On 9 July 2016 at 00:25, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Yukun,
>
> can you also post the code how you are invoking the GenericFlatMapper on
> the mailing list?
>

Here is the code defining the topology:

DataStream<String> stream = ...;
stream
        .keyBy(new KeySelector<String, Integer>() {
            @Override
            public Integer getKey(String x) throws Exception {
                return x.hashCode() % 10;
            }
        })
        .timeWindow(Time.seconds(10))
        .fold(new TreeMap<String, Long>(), new FoldFunction<String,
SortedMap<String, Long>>() {
            @Override
            public SortedMap<String, Long> fold(SortedMap<String, Long> map,
                                                String x) {
                Long current = map.get(x);
                Long updated = current != null ? current + 1 : 1;
                map.put(x, updated);
                return map;
            }
        })
        .flatMap(new GenericFlatMapper<String>())
        .returns(new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo())
 // throws InvalidTypesException if you comment out this line
        .print();



>
> The Java compiler is usually dropping the generic types during compilation
> ("type erasure"), that's why we can not infer the types.
>
>
The error message implies type extraction should be possible when "all
variables in the return type can be deduced from the input type(s)". This
is true for flatMap(Tuple2<Long, T>, Collector<Tuple2<T, String>>), but if
the signature is changed to void flatMap(SortedMap<T, Long>,
Collector<Tuple2<T, Long>>), type inference fails.


>
> On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <gyk....@gmail.com> wrote:
>
>> Hi,
>> When I run the code implementing a generic FlatMapFunction, Flink
>> complained about InvalidTypesException:
>>
>> public class GenericFlatMapper<T> implements FlatMapFunction<SortedMap<T, 
>> Long>, Tuple2<T, Long>> {
>>     @Override
>>     public void flatMap(SortedMap<T, Long> m, Collector<Tuple2<T, Long>> 
>> out) throws Exception {
>>         for (Map.Entry<T, Long> entry : m.entrySet()) {
>>             out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
>>         }
>>     }
>> }
>>
>>
>> *Exception in thread "main"
>> org.apache.flink.api.common.functions.InvalidTypesException: The return
>> type of function could not be determined automatically, due to type
>> erasure. You can give type information hints by using the returns(...)
>> method on the result of the transformation call, or by letting your
>> function implement the 'ResultTypeQueryable' interface.*
>>
>> *...*
>> *Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
>> Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be
>> determined. This is most likely a type erasure problem. The type extraction
>> currently supports types with generic variables only in cases where all
>> variables in the return type can be deduced from the input type(s).*
>>
>> This puzzles me as Flink should be able to infer the type from arguments.
>> I know returns(...) or other workarounds to give type hint, but they are
>> kind of verbose. Any suggestions?
>>
>>
>

Reply via email to