Repository: flink
Updated Branches:
  refs/heads/master e69d14521 -> f73a12e72


[FLINK-3046] Integrate the Either Java type with the TypeExtractor

This closes #1393.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f73a12e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f73a12e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f73a12e7

Branch: refs/heads/master
Commit: f73a12e728431d87065f7d0082380a527df39e44
Parents: e69d145
Author: twalthr <twal...@apache.org>
Authored: Mon Nov 23 15:08:47 2015 +0100
Committer: twalthr <twal...@apache.org>
Committed: Fri Nov 27 14:58:13 2015 +0100

----------------------------------------------------------------------
 .../api/java/typeutils/EitherTypeInfo.java      |  12 +-
 .../flink/api/java/typeutils/TypeExtractor.java | 196 ++++++++++++++-----
 .../java/type/extractor/TypeExtractorTest.java  |  81 +++++++-
 3 files changed, 235 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
index 40ed0c0..ec7be97 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -37,7 +37,7 @@ public class EitherTypeInfo<L, R> extends 
TypeInformation<Either<L, R>> {
 
        private final TypeInformation<R> rightType;
 
-       public EitherTypeInfo(TypeInformation<L> leftType,TypeInformation<R> 
rightType) {
+       public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> 
rightType) {
                this.leftType = leftType;
                this.rightType = rightType;
        }
@@ -108,4 +108,14 @@ public class EitherTypeInfo<L, R> extends 
TypeInformation<Either<L, R>> {
                return obj instanceof EitherTypeInfo;
        }
 
+       // 
--------------------------------------------------------------------------------------------
+
+       public TypeInformation<L> getLeftType() {
+               return leftType;
+       }
+
+       public TypeInformation<R> getRightType() {
+               return rightType;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 782d58d..ff6a82c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -420,65 +420,53 @@ public class TypeExtractor {
                        }
                        
                        typeHierarchy.add(curT);
-                       
-                       ParameterizedType tupleChild = (ParameterizedType) curT;
-                       
-                       Type[] subtypes = new 
Type[tupleChild.getActualTypeArguments().length];
-                       
-                       // materialize possible type variables
-                       for (int i = 0; i < subtypes.length; i++) {
-                               // materialize immediate TypeVariables
-                               if (tupleChild.getActualTypeArguments()[i] 
instanceof TypeVariable<?>) {
-                                       subtypes[i] = 
materializeTypeVariable(typeHierarchy, (TypeVariable<?>) 
tupleChild.getActualTypeArguments()[i]);
+
+                       // create the type information for the subtypes
+                       TypeInformation<?>[] subTypesInfo = 
createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, 
in2Type);
+                       // type needs to be treated a pojo due to additional 
fields
+                       if (subTypesInfo == null) {
+                               if (t instanceof ParameterizedType) {
+                                       return (TypeInformation<OUT>) 
analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), 
(ParameterizedType) t, in1Type, in2Type);
                                }
-                               // class or parameterized type
                                else {
-                                       subtypes[i] = 
tupleChild.getActualTypeArguments()[i];
+                                       return (TypeInformation<OUT>) 
analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, 
in2Type);
                                }
                        }
+                       // return tuple info
+                       return new TupleTypeInfo(typeToClass(t), subTypesInfo);
                        
-                       TypeInformation<?>[] tupleSubTypes = new 
TypeInformation<?>[subtypes.length];
-                       for (int i = 0; i < subtypes.length; i++) {
-                               ArrayList<Type> subTypeHierarchy = new 
ArrayList<Type>(typeHierarchy);
-                               subTypeHierarchy.add(subtypes[i]);
-                               // sub type could not be determined with 
materializing
-                               // try to derive the type info of the 
TypeVariable from the immediate base child input as a last attempt
-                               if (subtypes[i] instanceof TypeVariable<?>) {
-                                       tupleSubTypes[i] = 
createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, 
in1Type, in2Type);
-                                       
-                                       // variable could not be determined
-                                       if (tupleSubTypes[i] == null) {
-                                               throw new 
InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) 
subtypes[i]).getName() + "' in '"
-                                                               + 
((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
-                                                               + "' could not 
be determined. This is most likely a type erasure problem. "
-                                                               + "The type 
extraction currently supports types with generic variables only in cases where "
-                                                               + "all 
variables in the return type can be deduced from the input type(s).");
-                                       }
-                               } else {
-                                       tupleSubTypes[i] = 
createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, 
in2Type);
-                               }
+               }
+               // check if type is a subclass of Either
+               else if (isClassType(t) && 
Either.class.isAssignableFrom(typeToClass(t))) {
+                       Type curT = t;
+
+                       // go up the hierarchy until we reach Either (with or 
without generics)
+                       // collect the types while moving up for a later 
top-down
+                       while (!(isClassType(curT) && 
typeToClass(curT).equals(Either.class))) {
+                               typeHierarchy.add(curT);
+                               curT = typeToClass(curT).getGenericSuperclass();
                        }
-                       
-                       Class<?> tAsClass = null;
-                       if (isClassType(t)) {
-                               tAsClass = typeToClass(t);
-                       }
-                       Preconditions.checkNotNull(tAsClass, "t has a 
unexpected type");
-                       // check if the class we assumed to be a Tuple so far 
is actually a pojo because it contains additional fields.
-                       // check for additional fields.
-                       int fieldCount = countFieldsInClass(tAsClass);
-                       if(fieldCount != tupleSubTypes.length) {
-                               // the class is not a real tuple because it 
contains additional fields. treat as a pojo
+
+                       // check if Either has generics
+                       if (curT instanceof Class<?>) {
+                               throw new InvalidTypesException("Either needs 
to be parameterized by using generics.");
+                       }
+
+                       typeHierarchy.add(curT);
+
+                       // create the type information for the subtypes
+                       TypeInformation<?>[] subTypesInfo = 
createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, 
in2Type);
+                       // type needs to be treated a pojo due to additional 
fields
+                       if (subTypesInfo == null) {
                                if (t instanceof ParameterizedType) {
-                                       return (TypeInformation<OUT>) 
analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), (ParameterizedType) 
t, in1Type, in2Type);
+                                       return (TypeInformation<OUT>) 
analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), 
(ParameterizedType) t, in1Type, in2Type);
                                }
                                else {
-                                       return (TypeInformation<OUT>) 
analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null, in1Type, 
in2Type);
+                                       return (TypeInformation<OUT>) 
analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, 
in2Type);
                                }
                        }
-                       
-                       return new TupleTypeInfo(tAsClass, tupleSubTypes);
-                       
+                       // return either info
+                       return (TypeInformation<OUT>) new 
EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]);
                }
                // type depends on another type
                // e.g. class MyMapper<E> extends MapFunction<String, E>
@@ -675,6 +663,71 @@ public class TypeExtractor {
                }
                return info;
        }
+
+       /**
+        * Creates the TypeInformation for all elements of a type that expects 
a certain number of
+        * subtypes (e.g. TupleXX or Either).
+        *
+        * @param originalType most concrete subclass
+        * @param definingType type that defines the number of subtypes (e.g. 
Tuple2 -> 2 subtypes)
+        * @param typeHierarchy necessary for type inference
+        * @param in1Type necessary for type inference
+        * @param in2Type necessary for type inference
+        * @return array containing TypeInformation of sub types or null if 
definingType contains
+        *     more subtypes (fields) that defined
+        */
+       private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type 
originalType, ParameterizedType definingType,
+                       ArrayList<Type> typeHierarchy, TypeInformation<IN1> 
in1Type, TypeInformation<IN2> in2Type) {
+               Type[] subtypes = new 
Type[definingType.getActualTypeArguments().length];
+
+               // materialize possible type variables
+               for (int i = 0; i < subtypes.length; i++) {
+                       // materialize immediate TypeVariables
+                       if (definingType.getActualTypeArguments()[i] instanceof 
TypeVariable<?>) {
+                               subtypes[i] = 
materializeTypeVariable(typeHierarchy, (TypeVariable<?>) 
definingType.getActualTypeArguments()[i]);
+                       }
+                       // class or parameterized type
+                       else {
+                               subtypes[i] = 
definingType.getActualTypeArguments()[i];
+                       }
+               }
+
+               TypeInformation<?>[] subTypesInfo = new 
TypeInformation<?>[subtypes.length];
+               for (int i = 0; i < subtypes.length; i++) {
+                       ArrayList<Type> subTypeHierarchy = new 
ArrayList<Type>(typeHierarchy);
+                       subTypeHierarchy.add(subtypes[i]);
+                       // sub type could not be determined with materializing
+                       // try to derive the type info of the TypeVariable from 
the immediate base child input as a last attempt
+                       if (subtypes[i] instanceof TypeVariable<?>) {
+                               subTypesInfo[i] = 
createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, 
in1Type, in2Type);
+
+                               // variable could not be determined
+                               if (subTypesInfo[i] == null) {
+                                       throw new InvalidTypesException("Type 
of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
+                                                       + ((TypeVariable<?>) 
subtypes[i]).getGenericDeclaration()
+                                                       + "' could not be 
determined. This is most likely a type erasure problem. "
+                                                       + "The type extraction 
currently supports types with generic variables only in cases where "
+                                                       + "all variables in the 
return type can be deduced from the input type(s).");
+                               }
+                       } else {
+                               subTypesInfo[i] = 
createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, 
in2Type);
+                       }
+               }
+
+               Class<?> originalTypeAsClass = null;
+               if (isClassType(originalType)) {
+                       originalTypeAsClass = typeToClass(originalType);
+               }
+               Preconditions.checkNotNull(originalTypeAsClass, "originalType 
has an unexpected type");
+               // check if the class we assumed to conform to the defining 
type so far is actually a pojo because the
+               // original type contains additional fields.
+               // check for additional fields.
+               int fieldCount = countFieldsInClass(originalTypeAsClass);
+               if(fieldCount > subTypesInfo.length) {
+                       return null;
+               }
+               return subTypesInfo;
+       }
        
        // 
--------------------------------------------------------------------------------------------
        //  Extract type parameters
@@ -830,9 +883,32 @@ public class TypeExtractor {
                                }
                                
                                for (int i = 0; i < subTypes.length; i++) {
-                                       validateInfo(new 
ArrayList<Type>(typeHierarchy), subTypes[i], ((TupleTypeInfo<?>) 
typeInfo).getTypeAt(i));
+                                       validateInfo(new 
ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
                                }
                        }
+                       // check for Either
+                       else if (typeInfo instanceof EitherTypeInfo) {
+                               // check if Either at all
+                               if (!(isClassType(type) && 
Either.class.isAssignableFrom(typeToClass(type)))) {
+                                       throw new InvalidTypesException("Either 
type expected.");
+                               }
+
+                               // go up the hierarchy until we reach Either 
(with or without generics)
+                               while (!(isClassType(type) && 
typeToClass(type).equals(Either.class))) {
+                                       typeHierarchy.add(type);
+                                       type = 
typeToClass(type).getGenericSuperclass();
+                               }
+
+                               // check if Either has generics
+                               if (type instanceof Class<?>) {
+                                       throw new 
InvalidTypesException("Parameterized Either type expected.");
+                               }
+
+                               EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, 
?>) typeInfo;
+                               Type[] subTypes = ((ParameterizedType) 
type).getActualTypeArguments();
+                               validateInfo(new 
ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
+                               validateInfo(new 
ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
+                       }
                        // check for Writable
                        else if (typeInfo instanceof WritableTypeInfo<?>) {
                                // check if writable at all
@@ -1224,7 +1300,7 @@ public class TypeExtractor {
                if(Writable.class.isAssignableFrom(clazz) && 
!Writable.class.equals(clazz)) {
                        return (TypeInformation<OUT>) 
WritableTypeInfo.getWritableTypeInfo((Class<? extends Writable>) clazz);
                }
-               
+
                // check for basic types
                TypeInformation<OUT> basicTypeInfo = 
BasicTypeInfo.getInfoFor(clazz);
                if (basicTypeInfo != null) {
@@ -1245,6 +1321,11 @@ public class TypeExtractor {
                        throw new InvalidTypesException("Type information 
extraction for tuples (except Tuple0) cannot be done based on the class.");
                }
 
+               // check for subclasses of Either
+               if (Either.class.isAssignableFrom(clazz)) {
+                       throw new InvalidTypesException("Type information 
extraction for Either cannot be done based on the class.");
+               }
+
                // check for Enums
                if(Enum.class.isAssignableFrom(clazz)) {
                        return new EnumTypeInfo(clazz);
@@ -1558,7 +1639,20 @@ public class TypeExtractor {
                                infos[i] = privateGetForObject(field);
                        }
                        return new TupleTypeInfo(value.getClass(), infos);
-               } else {
+               }
+               // we can not extract the types from an Either object since it 
only contains type information
+               // of one type, but from Either classes
+               else if (value instanceof Either) {
+                       try {
+                               return (TypeInformation<X>) 
privateCreateTypeInfo(value.getClass());
+                       }
+                       catch (InvalidTypesException e) {
+                               throw new InvalidTypesException("Automatic type 
extraction is not possible on an Either type "
+                                               + "as it does not contain 
information about both possible types. "
+                                               + "Please specify the types 
directly.");
+                       }
+               }
+               else {
                        return privateGetForClass((Class<X>) value.getClass(), 
new ArrayList<Type>());
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index eae767d..7abfc76 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -46,6 +46,8 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.EnumTypeInfo;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
@@ -67,8 +69,6 @@ import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.xml.bind.TypeConstraintException;
-
 
 public class TypeExtractorTest {
 
@@ -1829,4 +1829,81 @@ public class TypeExtractorTest {
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes((MapFunction)function, 
BasicTypeInfo.INT_TYPE_INFO);
                Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
        }
+
+       public static class Either1<T> extends Either<String, T> {
+               @Override
+               public String left() throws IllegalStateException {
+                       return null;
+               }
+
+               @Override
+               public T right() throws IllegalStateException {
+                       return null;
+               }
+       }
+
+       public static class Either2 extends Either1<Tuple1<Integer>> {
+               // nothing to do here
+       }
+
+       public static class EitherMapper<T> implements MapFunction<T, 
Either1<T>> {
+               @Override
+               public Either1<T> map(T value) throws Exception {
+                       return null;
+               }
+       }
+
+       public static class EitherMapper2 implements MapFunction<String, 
Either2> {
+               @Override
+               public Either2 map(String value) throws Exception {
+                       return null;
+               }
+       }
+
+       public static class EitherMapper3 implements MapFunction<Either2, 
Either2> {
+               @Override
+               public Either2 map(Either2 value) throws Exception {
+                       return null;
+               }
+       }
+
+       @Test
+       public void testEither() {
+               MapFunction<?, ?> function = new MapFunction<Either<String, 
Boolean>, Either<String, Boolean>>() {
+                       @Override
+                       public Either<String, Boolean> map(Either<String, 
Boolean> value) throws Exception {
+                               return null;
+                       }
+               };
+               TypeInformation<?> expected = new 
EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes((MapFunction) function, expected);
+               Assert.assertEquals(expected, ti);
+       }
+
+       @Test
+       public void testEitherHierarchy() {
+               MapFunction<?, ?> function = new EitherMapper<Boolean>();
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes((MapFunction) function, 
BasicTypeInfo.BOOLEAN_TYPE_INFO);
+               TypeInformation<?> expected = new 
EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+               Assert.assertEquals(expected, ti);
+
+               function = new EitherMapper2();
+               ti = TypeExtractor.getMapReturnTypes((MapFunction) function, 
BasicTypeInfo.STRING_TYPE_INFO);
+               expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO));
+               Assert.assertEquals(expected, ti);
+
+               function = new EitherMapper3();
+               ti = TypeExtractor.getMapReturnTypes((MapFunction) function, 
expected);
+               Assert.assertEquals(expected, ti);
+
+               Either<String, Tuple1<Integer>> either = new Either2();
+               ti = TypeExtractor.getForObject(either);
+               Assert.assertEquals(expected, ti);
+       }
+
+       @Test(expected=InvalidTypesException.class)
+       public void testEitherFromObjectException() {
+               Either<String, Tuple1<Integer>> either = Either.left("test");
+               TypeExtractor.getForObject(either);
+       }
 }

Reply via email to