Repository: flink Updated Branches: refs/heads/master e69d14521 -> f73a12e72
[FLINK-3046] Integrate the Either Java type with the TypeExtractor This closes #1393. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f73a12e7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f73a12e7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f73a12e7 Branch: refs/heads/master Commit: f73a12e728431d87065f7d0082380a527df39e44 Parents: e69d145 Author: twalthr <twal...@apache.org> Authored: Mon Nov 23 15:08:47 2015 +0100 Committer: twalthr <twal...@apache.org> Committed: Fri Nov 27 14:58:13 2015 +0100 ---------------------------------------------------------------------- .../api/java/typeutils/EitherTypeInfo.java | 12 +- .../flink/api/java/typeutils/TypeExtractor.java | 196 ++++++++++++++----- .../java/type/extractor/TypeExtractorTest.java | 81 +++++++- 3 files changed, 235 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java index 40ed0c0..ec7be97 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java @@ -37,7 +37,7 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> { private final TypeInformation<R> rightType; - public EitherTypeInfo(TypeInformation<L> leftType,TypeInformation<R> rightType) { + public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) { this.leftType = leftType; this.rightType = rightType; } @@ -108,4 +108,14 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> { return obj instanceof EitherTypeInfo; } + // -------------------------------------------------------------------------------------------- + + public TypeInformation<L> getLeftType() { + return leftType; + } + + public TypeInformation<R> getRightType() { + return rightType; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 782d58d..ff6a82c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -420,65 +420,53 @@ public class TypeExtractor { } typeHierarchy.add(curT); - - ParameterizedType tupleChild = (ParameterizedType) curT; - - Type[] subtypes = new Type[tupleChild.getActualTypeArguments().length]; - - // materialize possible type variables - for (int i = 0; i < subtypes.length; i++) { - // materialize immediate TypeVariables - if (tupleChild.getActualTypeArguments()[i] instanceof TypeVariable<?>) { - subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) tupleChild.getActualTypeArguments()[i]); + + // create the type information for the subtypes + TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type); + // type needs to be treated a pojo due to additional fields + if (subTypesInfo == null) { + if (t instanceof ParameterizedType) { + return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type); } - // class or parameterized type else { - subtypes[i] = tupleChild.getActualTypeArguments()[i]; + return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type); } } + // return tuple info + return new TupleTypeInfo(typeToClass(t), subTypesInfo); - TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length]; - for (int i = 0; i < subtypes.length; i++) { - ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy); - subTypeHierarchy.add(subtypes[i]); - // sub type could not be determined with materializing - // try to derive the type info of the TypeVariable from the immediate base child input as a last attempt - if (subtypes[i] instanceof TypeVariable<?>) { - tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type); - - // variable could not be determined - if (tupleSubTypes[i] == null) { - throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '" - + ((TypeVariable<?>) subtypes[i]).getGenericDeclaration() - + "' could not be determined. This is most likely a type erasure problem. " - + "The type extraction currently supports types with generic variables only in cases where " - + "all variables in the return type can be deduced from the input type(s)."); - } - } else { - tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type); - } + } + // check if type is a subclass of Either + else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) { + Type curT = t; + + // go up the hierarchy until we reach Either (with or without generics) + // collect the types while moving up for a later top-down + while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) { + typeHierarchy.add(curT); + curT = typeToClass(curT).getGenericSuperclass(); } - - Class<?> tAsClass = null; - if (isClassType(t)) { - tAsClass = typeToClass(t); - } - Preconditions.checkNotNull(tAsClass, "t has a unexpected type"); - // check if the class we assumed to be a Tuple so far is actually a pojo because it contains additional fields. - // check for additional fields. - int fieldCount = countFieldsInClass(tAsClass); - if(fieldCount != tupleSubTypes.length) { - // the class is not a real tuple because it contains additional fields. treat as a pojo + + // check if Either has generics + if (curT instanceof Class<?>) { + throw new InvalidTypesException("Either needs to be parameterized by using generics."); + } + + typeHierarchy.add(curT); + + // create the type information for the subtypes + TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type); + // type needs to be treated a pojo due to additional fields + if (subTypesInfo == null) { if (t instanceof ParameterizedType) { - return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type); + return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type); } else { - return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type); + return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type); } } - - return new TupleTypeInfo(tAsClass, tupleSubTypes); - + // return either info + return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]); } // type depends on another type // e.g. class MyMapper<E> extends MapFunction<String, E> @@ -675,6 +663,71 @@ public class TypeExtractor { } return info; } + + /** + * Creates the TypeInformation for all elements of a type that expects a certain number of + * subtypes (e.g. TupleXX or Either). + * + * @param originalType most concrete subclass + * @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes) + * @param typeHierarchy necessary for type inference + * @param in1Type necessary for type inference + * @param in2Type necessary for type inference + * @return array containing TypeInformation of sub types or null if definingType contains + * more subtypes (fields) that defined + */ + private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType, + ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + Type[] subtypes = new Type[definingType.getActualTypeArguments().length]; + + // materialize possible type variables + for (int i = 0; i < subtypes.length; i++) { + // materialize immediate TypeVariables + if (definingType.getActualTypeArguments()[i] instanceof TypeVariable<?>) { + subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) definingType.getActualTypeArguments()[i]); + } + // class or parameterized type + else { + subtypes[i] = definingType.getActualTypeArguments()[i]; + } + } + + TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length]; + for (int i = 0; i < subtypes.length; i++) { + ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy); + subTypeHierarchy.add(subtypes[i]); + // sub type could not be determined with materializing + // try to derive the type info of the TypeVariable from the immediate base child input as a last attempt + if (subtypes[i] instanceof TypeVariable<?>) { + subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type); + + // variable could not be determined + if (subTypesInfo[i] == null) { + throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '" + + ((TypeVariable<?>) subtypes[i]).getGenericDeclaration() + + "' could not be determined. This is most likely a type erasure problem. " + + "The type extraction currently supports types with generic variables only in cases where " + + "all variables in the return type can be deduced from the input type(s)."); + } + } else { + subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type); + } + } + + Class<?> originalTypeAsClass = null; + if (isClassType(originalType)) { + originalTypeAsClass = typeToClass(originalType); + } + Preconditions.checkNotNull(originalTypeAsClass, "originalType has an unexpected type"); + // check if the class we assumed to conform to the defining type so far is actually a pojo because the + // original type contains additional fields. + // check for additional fields. + int fieldCount = countFieldsInClass(originalTypeAsClass); + if(fieldCount > subTypesInfo.length) { + return null; + } + return subTypesInfo; + } // -------------------------------------------------------------------------------------------- // Extract type parameters @@ -830,9 +883,32 @@ public class TypeExtractor { } for (int i = 0; i < subTypes.length; i++) { - validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], ((TupleTypeInfo<?>) typeInfo).getTypeAt(i)); + validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i)); } } + // check for Either + else if (typeInfo instanceof EitherTypeInfo) { + // check if Either at all + if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) { + throw new InvalidTypesException("Either type expected."); + } + + // go up the hierarchy until we reach Either (with or without generics) + while (!(isClassType(type) && typeToClass(type).equals(Either.class))) { + typeHierarchy.add(type); + type = typeToClass(type).getGenericSuperclass(); + } + + // check if Either has generics + if (type instanceof Class<?>) { + throw new InvalidTypesException("Parameterized Either type expected."); + } + + EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo; + Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments(); + validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType()); + validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType()); + } // check for Writable else if (typeInfo instanceof WritableTypeInfo<?>) { // check if writable at all @@ -1224,7 +1300,7 @@ public class TypeExtractor { if(Writable.class.isAssignableFrom(clazz) && !Writable.class.equals(clazz)) { return (TypeInformation<OUT>) WritableTypeInfo.getWritableTypeInfo((Class<? extends Writable>) clazz); } - + // check for basic types TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz); if (basicTypeInfo != null) { @@ -1245,6 +1321,11 @@ public class TypeExtractor { throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class."); } + // check for subclasses of Either + if (Either.class.isAssignableFrom(clazz)) { + throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class."); + } + // check for Enums if(Enum.class.isAssignableFrom(clazz)) { return new EnumTypeInfo(clazz); @@ -1558,7 +1639,20 @@ public class TypeExtractor { infos[i] = privateGetForObject(field); } return new TupleTypeInfo(value.getClass(), infos); - } else { + } + // we can not extract the types from an Either object since it only contains type information + // of one type, but from Either classes + else if (value instanceof Either) { + try { + return (TypeInformation<X>) privateCreateTypeInfo(value.getClass()); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Automatic type extraction is not possible on an Either type " + + "as it does not contain information about both possible types. " + + "Please specify the types directly."); + } + } + else { return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index eae767d..7abfc76 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -46,6 +46,8 @@ import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.api.java.typeutils.EnumTypeInfo; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.MissingTypeInfo; @@ -67,8 +69,6 @@ import org.apache.hadoop.io.Writable; import org.junit.Assert; import org.junit.Test; -import javax.xml.bind.TypeConstraintException; - public class TypeExtractorTest { @@ -1829,4 +1829,81 @@ public class TypeExtractorTest { TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, BasicTypeInfo.INT_TYPE_INFO); Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); } + + public static class Either1<T> extends Either<String, T> { + @Override + public String left() throws IllegalStateException { + return null; + } + + @Override + public T right() throws IllegalStateException { + return null; + } + } + + public static class Either2 extends Either1<Tuple1<Integer>> { + // nothing to do here + } + + public static class EitherMapper<T> implements MapFunction<T, Either1<T>> { + @Override + public Either1<T> map(T value) throws Exception { + return null; + } + } + + public static class EitherMapper2 implements MapFunction<String, Either2> { + @Override + public Either2 map(String value) throws Exception { + return null; + } + } + + public static class EitherMapper3 implements MapFunction<Either2, Either2> { + @Override + public Either2 map(Either2 value) throws Exception { + return null; + } + } + + @Test + public void testEither() { + MapFunction<?, ?> function = new MapFunction<Either<String, Boolean>, Either<String, Boolean>>() { + @Override + public Either<String, Boolean> map(Either<String, Boolean> value) throws Exception { + return null; + } + }; + TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected); + Assert.assertEquals(expected, ti); + } + + @Test + public void testEitherHierarchy() { + MapFunction<?, ?> function = new EitherMapper<Boolean>(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.BOOLEAN_TYPE_INFO); + TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); + Assert.assertEquals(expected, ti); + + function = new EitherMapper2(); + ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.STRING_TYPE_INFO); + expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO)); + Assert.assertEquals(expected, ti); + + function = new EitherMapper3(); + ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected); + Assert.assertEquals(expected, ti); + + Either<String, Tuple1<Integer>> either = new Either2(); + ti = TypeExtractor.getForObject(either); + Assert.assertEquals(expected, ti); + } + + @Test(expected=InvalidTypesException.class) + public void testEitherFromObjectException() { + Either<String, Tuple1<Integer>> either = Either.left("test"); + TypeExtractor.getForObject(either); + } }