Re: Does Flink support raw generic types in a merged stream?

2019-07-23 Thread Fabian Hueske
Hi John,

You could implement your own n-ary Either type.
It's a bit of work because you'd need also a custom TypeInfo & Serializer
but rather straightforward if you follow the implementation of Either.

Best,
Fabian

Am Mi., 17. Juli 2019 um 16:28 Uhr schrieb John Tipper <
john_tip...@hotmail.com>:

> Hi Chesnay,
>
> Yes, but the actual use case needs to support more than 2 streams, so if I
> go down the Either route then I have arbitrarily sized nested Eithers, i.e.
> Either, C> etc, which gets pretty messy very quickly.
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> On 17 Jul 2019, at 13:29, Chesnay Schepler  wrote:
>
> Have you looked at org.apache.flink.types.Either? If you'd wrap all
> elements in both streams before the union you should be able to join them
> properly.
>
> On 17/07/2019 14:18, John Tipper wrote:
>
> Hi All,
>
> Can I union/join 2 streams containing generic classes, where each stream
> has a different parameterised type? I'd like to process the combined stream
> of values as a single raw type, casting to a specific type for detailed
> processing, based on some information in the type that will allow me to
> safely cast to the specific type.
>
> I can't share my exact code, but the below example shows the sort of thing
> I want to do.
>
> So, as an example, given the following generic type:
>
> class MyGenericContainer extends Tuple3 {
> ...
> private final String myString;
> private final IN value;
> private final Class clazz; // created by constructor
> private SomeOtherClass someOtherClass;
> ...
> }
>
> and 2 streams, I'd like to be able to do something like:
>
> DataStream> stream1 = ...
> DataStream> stream2 = ...
> DataStream<...> merged = stream1.union(stream2).process(new 
> MyProcessFunction());
> // within an operator, such as a MyProcessFunction:
> MyGenericContainer container = raw generic container passed to function;
> Object rawValue = container.getValue();
> performProcessing((container.getClazz())rawValue); // safely cast rawValue
>
> However, I get an error when I do this:
>
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type 
> of TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
> determined. This is most likely a type erasure problem. The type extraction 
> currently supports types with generic variables only in cases where all 
> variables in the return type can be deduced from the input type(s). Otherwise 
> the type has to be specified explicitly using type information.
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
> at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
> at 
> org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)
>
> If I try to add a returns() to the code, like this:
>
> DataStream<...> merged = stream1.union(stream2)
> .process(...)
> .returns(new TypeHint() {})
>
> then I get a different exception:
>
> Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
> TypeHint is using a generic variable.This is not supported, generic types 
> must be fully specified for the TypeHint.
>
> Is this sort of thing supported or is there another way of joining
> multiple streams into a single stream, where each stream object will have a
> specific type of a common generic type?
>
>
> Many thanks,
>
> John
>
>
>


Re: Does Flink support raw generic types in a merged stream?

2019-07-17 Thread John Tipper
Hi Chesnay,

Yes, but the actual use case needs to support more than 2 streams, so if I go 
down the Either route then I have arbitrarily sized nested Eithers, i.e. 
Either, C> etc, which gets pretty messy very quickly.

Many thanks,

John

Sent from my iPhone

On 17 Jul 2019, at 13:29, Chesnay Schepler 
mailto:ches...@apache.org>> wrote:


Have you looked at org.apache.flink.types.Either? If you'd wrap all elements in 
both streams before the union you should be able to join them properly.

On 17/07/2019 14:18, John Tipper wrote:
Hi All,


Can I union/join 2 streams containing generic classes, where each stream has a 
different parameterised type? I'd like to process the combined stream of values 
as a single raw type, casting to a specific type for detailed processing, based 
on some information in the type that will allow me to safely cast to the 
specific type.

I can't share my exact code, but the below example shows the sort of thing I 
want to do.

So, as an example, given the following generic type:

class MyGenericContainer extends Tuple3 {
...

private final String myString;

private final IN value;

private final Class clazz; // created by constructor

private SomeOtherClass someOtherClass;

...

}


and 2 streams, I'd like to be able to do something like:

DataStream> stream1 = ...
DataStream> stream2 = ...



DataStream<...> merged = stream1.union(stream2).process(new 
MyProcessFunction());



// within an operator, such as a MyProcessFunction:

MyGenericContainer container = raw generic container passed to function;

Object rawValue = container.getValue();

performProcessing((container.getClazz())rawValue); // safely cast rawValue


However, I get an error when I do this:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s). Otherwise 
the type has to be specified explicitly using type information.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)

at 
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)


If I try to add a returns() to the code, like this:

DataStream<...> merged = stream1.union(stream2)
.process(...)

.returns(new TypeHint() {})


then I get a different exception:

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
TypeHint is using a generic variable.This is not supported, generic types must 
be fully specified for the TypeHint.


Is this sort of thing supported or is there another way of joining multiple 
streams into a single stream, where each stream object will have a specific 
type of a common generic type?


Many thanks,

John




Re: Does Flink support raw generic types in a merged stream?

2019-07-17 Thread Chesnay Schepler
Have you looked at org.apache.flink.types.Either? If you'd wrap all 
elements in both streams before the union you should be able to join 
them properly.



On 17/07/2019 14:18, John Tipper wrote:

Hi All,

Can I union/join 2 streams containing generic classes, where each 
stream has a different parameterised type? I'd like to process the 
combined stream of values as a single raw type, casting to a specific 
type for detailed processing, based on some information in the type 
that will allow me to safely cast to the specific type.


I can't share my exact code, but the below example shows the sort of 
thing I want to do.


So, as an example, given the following generic type:

class MyGenericContainer extends Tuple3 {
    ...
    private final String myString;
    private final IN value;
    private final Class clazz; // created by constructor
    private SomeOtherClass someOtherClass;
    ...
} ||

and 2 streams, I'd like to be able to do something like:

DataStream> stream1 = ...
DataStream> stream2 = ...
DataStream<...> merged = stream1.union(stream2).process(new 
MyProcessFunction());

// within an operator, such as a MyProcessFunction:
MyGenericContainer container = raw generic container passed to function;
Object rawValue = container.getValue();
performProcessing((container.getClazz())rawValue); // safely cast 
rawValue ||


However, I get an error when I do this:

Caused by: 
org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
determined. This is most likely a type erasure problem. The type 
extraction currently supports types with generic variables only in 
cases where all variables in the return type can be deduced from the 
input type(s). Otherwise the type has to be specified explicitly using 
type information.
    at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133) 

    at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853) 

    at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) 

    at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587) 

    at 
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633) 
||


If I try to add a|returns()|to the code, like this:

DataStream<...> merged = stream1.union(stream2)
    .process(...)
    .returns(new TypeHint() {}) ||

then I get a different exception:

Exception in thread "main" 
org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a 
generic variable.This is not supported, generic types must be fully 
specified for the TypeHint.||


Is this sort of thing supported or is there another way of joining 
multiple streams into a single stream, where each stream object will 
have a specific type of a common generic type?



Many thanks,

John






Does Flink support raw generic types in a merged stream?

2019-07-17 Thread John Tipper
Hi All,


Can I union/join 2 streams containing generic classes, where each stream has a 
different parameterised type? I'd like to process the combined stream of values 
as a single raw type, casting to a specific type for detailed processing, based 
on some information in the type that will allow me to safely cast to the 
specific type.

I can't share my exact code, but the below example shows the sort of thing I 
want to do.

So, as an example, given the following generic type:

class MyGenericContainer extends Tuple3 {
...
private final String myString;
private final IN value;
private final Class clazz; // created by constructor
private SomeOtherClass someOtherClass;
...
}

and 2 streams, I'd like to be able to do something like:

DataStream> stream1 = ...
DataStream> stream2 = ...

DataStream<...> merged = stream1.union(stream2).process(new 
MyProcessFunction());

// within an operator, such as a MyProcessFunction:
MyGenericContainer container = raw generic container passed to function;
Object rawValue = container.getValue();
performProcessing((container.getClazz())rawValue); // safely cast rawValue

However, I get an error when I do this:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s). Otherwise 
the type has to be specified explicitly using type information.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
at 
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)

If I try to add a returns() to the code, like this:

DataStream<...> merged = stream1.union(stream2)
.process(...)
.returns(new TypeHint() {})

then I get a different exception:

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
TypeHint is using a generic variable.This is not supported, generic types must 
be fully specified for the TypeHint.

Is this sort of thing supported or is there another way of joining multiple 
streams into a single stream, where each stream object will have a specific 
type of a common generic type?


Many thanks,

John