Hi John, You could implement your own n-ary Either type. It's a bit of work because you'd need also a custom TypeInfo & Serializer but rather straightforward if you follow the implementation of Either.
Best, Fabian Am Mi., 17. Juli 2019 um 16:28 Uhr schrieb John Tipper < john_tip...@hotmail.com>: > Hi Chesnay, > > Yes, but the actual use case needs to support more than 2 streams, so if I > go down the Either route then I have arbitrarily sized nested Eithers, i.e. > Either<Either<A, B>, C> etc, which gets pretty messy very quickly. > > Many thanks, > > John > > Sent from my iPhone > > On 17 Jul 2019, at 13:29, Chesnay Schepler <ches...@apache.org> wrote: > > Have you looked at org.apache.flink.types.Either? If you'd wrap all > elements in both streams before the union you should be able to join them > properly. > > On 17/07/2019 14:18, John Tipper wrote: > > Hi All, > > Can I union/join 2 streams containing generic classes, where each stream > has a different parameterised type? I'd like to process the combined stream > of values as a single raw type, casting to a specific type for detailed > processing, based on some information in the type that will allow me to > safely cast to the specific type. > > I can't share my exact code, but the below example shows the sort of thing > I want to do. > > So, as an example, given the following generic type: > > class MyGenericContainer<IN> extends Tuple3<String, IN, SomeOtherClass> { > ... > private final String myString; > private final IN value; > private final Class<IN> clazz; // created by constructor > private SomeOtherClass someOtherClass; > ... > } > > and 2 streams, I'd like to be able to do something like: > > DataStream<MyGenericContainer<String>> stream1 = ... > DataStream<MyGenericContainer<Integer>> stream2 = ... > DataStream<...> merged = stream1.union(stream2).process(new > MyProcessFunction()); > // within an operator, such as a MyProcessFunction: > MyGenericContainer container = raw generic container passed to function; > Object rawValue = container.getValue(); > performProcessing((container.getClazz())rawValue); // safely cast rawValue > > However, I get an error when I do this: > > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type > of TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be > determined. This is most likely a type erasure problem. The type extraction > currently supports types with generic variables only in cases where all > variables in the return type can be deduced from the input type(s). Otherwise > the type has to be specified explicitly using type information. > at > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587) > at > org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633) > > If I try to add a returns() to the code, like this: > > DataStream<...> merged = stream1.union(stream2) > .process(...) > .returns(new TypeHint<MyGenericContainer>() {}) > > then I get a different exception: > > Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The > TypeHint is using a generic variable.This is not supported, generic types > must be fully specified for the TypeHint. > > Is this sort of thing supported or is there another way of joining > multiple streams into a single stream, where each stream object will have a > specific type of a common generic type? > > > Many thanks, > > John > > >