Hi John,

You could implement your own n-ary Either type.
It's a bit of work because you'd need also a custom TypeInfo & Serializer
but rather straightforward if you follow the implementation of Either.

Best,
Fabian

Am Mi., 17. Juli 2019 um 16:28 Uhr schrieb John Tipper <
john_tip...@hotmail.com>:

> Hi Chesnay,
>
> Yes, but the actual use case needs to support more than 2 streams, so if I
> go down the Either route then I have arbitrarily sized nested Eithers, i.e.
> Either<Either<A, B>, C> etc, which gets pretty messy very quickly.
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> On 17 Jul 2019, at 13:29, Chesnay Schepler <ches...@apache.org> wrote:
>
> Have you looked at org.apache.flink.types.Either? If you'd wrap all
> elements in both streams before the union you should be able to join them
> properly.
>
> On 17/07/2019 14:18, John Tipper wrote:
>
> Hi All,
>
> Can I union/join 2 streams containing generic classes, where each stream
> has a different parameterised type? I'd like to process the combined stream
> of values as a single raw type, casting to a specific type for detailed
> processing, based on some information in the type that will allow me to
> safely cast to the specific type.
>
> I can't share my exact code, but the below example shows the sort of thing
> I want to do.
>
> So, as an example, given the following generic type:
>
> class MyGenericContainer<IN> extends Tuple3<String, IN, SomeOtherClass> {
>     ...
>     private final String myString;
>     private final IN value;
>     private final Class<IN> clazz; // created by constructor
>     private SomeOtherClass someOtherClass;
>     ...
> }
>
> and 2 streams, I'd like to be able to do something like:
>
> DataStream<MyGenericContainer<String>> stream1 = ...
> DataStream<MyGenericContainer<Integer>> stream2 = ...
> DataStream<...> merged = stream1.union(stream2).process(new 
> MyProcessFunction());
> // within an operator, such as a MyProcessFunction:
> MyGenericContainer container = raw generic container passed to function;
> Object rawValue = container.getValue();
> performProcessing((container.getClazz())rawValue); // safely cast rawValue
>
> However, I get an error when I do this:
>
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type 
> of TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
> determined. This is most likely a type erasure problem. The type extraction 
> currently supports types with generic variables only in cases where all 
> variables in the return type can be deduced from the input type(s). Otherwise 
> the type has to be specified explicitly using type information.
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
>     at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
>     at 
> org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)
>
> If I try to add a returns() to the code, like this:
>
> DataStream<...> merged = stream1.union(stream2)
>     .process(...)
>     .returns(new TypeHint<MyGenericContainer>() {})
>
> then I get a different exception:
>
> Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
> TypeHint is using a generic variable.This is not supported, generic types 
> must be fully specified for the TypeHint.
>
> Is this sort of thing supported or is there another way of joining
> multiple streams into a single stream, where each stream object will have a
> specific type of a common generic type?
>
>
> Many thanks,
>
> John
>
>
>

Reply via email to