Hi Jacopo, Robert, 

         Very sorry for missing the previous email and not response in time. I 
think exactly as Robert has pointed out with the example: using inline 
anonymous subclass of KeyedBroadcastProcessFunction should not cause the 
problem. As far as I know, the possible reason that cause the attached 
exception might be that the parameter types of Either get erased due to the way 
to create KeyedBroadcastProcessFunction object. For example, if you first 
implement a generic subclass of KeyedBroadcastProcessFunction like:

public class MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType> extends 
KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, 
Either<MyLeftType, MyRightType>> { ... }

     and create a function object directly when constructing the DataStream job:

stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType>());

     Then MyLeftType and MyRightType will be erased and will cause the attached 
exception when Flink tries to inference the output type. 

     And I totally agree with Robert that attaching the corresponding codes 
would help debugging the problem.

  Yours,
    Yun



------------------------------------------------------------------
From:Robert Metzger <rmetz...@apache.org>
Send Time:2020 Feb. 21 (Fri.) 19:47
To:jacopo.gobbi <jacopo.go...@ubs.com>
Cc:yungao.gy <yungao...@aliyun.com>; user <user@flink.apache.org>
Subject:Re: Flink's Either type information

Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it worked in 
this artificial example:

MapStateDescriptor<String, String> state = new MapStateDescriptor<>("test", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
DataStream<Either<Integer, String>> result = input
      .map((MapFunction<String, Tuple2<Integer, String>>) value -> Tuple2.of(0, 
value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, 
String.class))
      .keyBy(0).connect(input.broadcast(state))
      .process(new KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, 
String>, String, Either<Integer, String>>() {
         @Override
         public void processElement(Tuple2<Integer, String> value, 
ReadOnlyContext ctx, Collector<Either<Integer, String>> out) throws Exception {
            out.collect(Either.Left(111));
         }
         @Override
         public void processBroadcastElement(String value, Context ctx, 
Collector<Either<Integer, String>> out) throws Exception { }
      });
result.print();
On Wed, Feb 19, 2020 at 6:07 PM <jacopo.go...@ubs.com> wrote:

Yes, I create it the way you mentioned.
 
From: Yun Gao [mailto:yungao...@aliyun.com] 
Sent: Dienstag, 18. Februar 2020 10:12
To: Gobbi, Jacopo-XT; user
Subject: [External] Re: Flink's Either type information
 
      Hi Jacopo,
 
          Could you also provide how the KeyedBroadcastProcessFunction is 
created when constructing datastream API ? For example, are you using something 
like 
 
          new KeyedBroadcastProcessFunction<Integer, Integer, Integer, 
Either<MyLeft, MyRight>() { 
                       // Function implementation
             }
 
             or something else?
 
     Best, 
      Yun
 
 
------------------------------------------------------------------
From:jacopo.gobbi <jacopo.go...@ubs.com>
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user <user@flink.apache.org>
Subject:Flink's Either type information
 
Hi all,
 
How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on 
Either type as it does not contain information about the 'left' type." when 
doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));
 
Thanks,
 
Jacopo Gobbi
 
 

Reply via email to