http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java new file mode 100644 index 0000000..bc11848 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java @@ -0,0 +1,812 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.HashMultiset; + +/** + * Pojo Type tests + * + * A Pojo is a bean-style class with getters, setters and empty ctor + * OR a class with all fields public (or for every private field, there has to be a public getter/setter) + * everything else is a generic type (that can't be used for field selection) + */ +public class PojoTypeExtractionTest { + + public static class HasDuplicateField extends WC { + @SuppressWarnings("unused") + private int count; // duplicate + } + + @Test(expected=RuntimeException.class) + public void testDuplicateFieldException() { + TypeExtractor.createTypeInfo(HasDuplicateField.class); + } + + // test with correct pojo types + public static class WC { // is a pojo + public ComplexNestedClass complex; // is a pojo + private int count; // is a BasicType + + public WC() { + } + public int getCount() { + return count; + } + public void setCount(int c) { + this.count = c; + } + } + public static class ComplexNestedClass { // pojo + public static int ignoreStaticField; + public transient int ignoreTransientField; + public Date date; // generic type + public Integer someNumberWithÃnicödeNäme; // BasicType + public float someFloat; // BasicType + public Tuple3<Long, Long, String> word; //Tuple Type with three basic types + public Object nothing; // generic type + public MyWritable hadoopCitizen; // writableType + public List<String> collection; + } + + // all public test + public static class AllPublic extends ComplexNestedClass { + public ArrayList<String> somethingFancy; // generic type + public HashMultiset<Integer> fancyIds; // generic type + public String[] fancyArray; // generic type + } + + public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> { + public String field3; + } + public static class PojoWithGenerics<T1, T2> { + public int key; + public T1 field1; + public T2 field2; + } + + public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {} + public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {} + + // extends from Tuple and adds a field + public static class FromTuple extends Tuple3<String, String, Long> { + private static final long serialVersionUID = 1L; + public int special; + } + + public static class IncorrectPojo { + private int isPrivate; + public int getIsPrivate() { + return isPrivate; + } + // setter is missing (intentional) + } + + // correct pojo + public static class BeanStylePojo { + public String abc; + private int field; + public int getField() { + return this.field; + } + public void setField(int f) { + this.field = f; + } + } + public static class WrongCtorPojo { + public int a; + public WrongCtorPojo(int a) { + this.a = a; + } + } + + public static class PojoWithGenericFields { + private Collection<String> users; + private boolean favorited; + + public boolean isFavorited() { + return favorited; + } + + public void setFavorited(boolean favorited) { + this.favorited = favorited; + } + + public Collection<String> getUsers() { + return users; + } + + public void setUsers(Collection<String> users) { + this.users = users; + } + } + @Test + public void testPojoWithGenericFields() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenericFields.class); + + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + } + + + // in this test, the location of the getters and setters is mixed across the type hierarchy. + public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> { + public void setPackageProtected(String in) { + this.packageProtected = in; + } + } + public static class GenericPojoGetterSetterCheck<T> { + T packageProtected; + public T getPackageProtected() { + return packageProtected; + } + } + + @Test + public void testIncorrectPojos() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class); + Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>); + + typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class); + Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>); + } + + @Test + public void testCorrectPojos() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + + typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + } + + @Test + public void testPojoWC() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class); + checkWCPojoAsserts(typeForClass); + + WC t = new WC(); + t.complex = new ComplexNestedClass(); + TypeInformation<?> typeForObject = TypeExtractor.getForObject(t); + checkWCPojoAsserts(typeForObject); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void checkWCPojoAsserts(TypeInformation<?> typeInfo) { + Assert.assertFalse(typeInfo.isBasicType()); + Assert.assertFalse(typeInfo.isTupleType()); + Assert.assertEquals(10, typeInfo.getTotalFields()); + Assert.assertTrue(typeInfo instanceof PojoTypeInfo); + PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo; + + List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); + String[] fields = {"count", + "complex.date", + "complex.hadoopCitizen", + "complex.collection", + "complex.nothing", + "complex.someFloat", + "complex.someNumberWithÃnicödeNäme", + "complex.word.f0", + "complex.word.f1", + "complex.word.f2"}; + int[] positions = {9, + 1, + 2, + 0, + 3, + 4, + 5, + 6, + 7, + 8}; + Assert.assertEquals(fields.length, positions.length); + for(int i = 0; i < fields.length; i++) { + pojoType.getFlatFields(fields[i], 0, ffd); + Assert.assertEquals("Too many keys returned", 1, ffd.size()); + Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition()); + ffd.clear(); + } + + pojoType.getFlatFields("complex.word.*", 0, ffd); + Assert.assertEquals(3, ffd.size()); + // check if it returns 5,6,7 + for(FlatFieldDescriptor ffdE : ffd) { + final int pos = ffdE.getPosition(); + Assert.assertTrue(pos <= 8 ); + Assert.assertTrue(6 <= pos ); + if(pos == 6) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 7) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 8) { + Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); + } + } + ffd.clear(); + + // scala style full tuple selection for pojos + pojoType.getFlatFields("complex.word._", 0, ffd); + Assert.assertEquals(3, ffd.size()); + ffd.clear(); + + pojoType.getFlatFields("complex.*", 0, ffd); + Assert.assertEquals(9, ffd.size()); + // check if it returns 0-7 + for(FlatFieldDescriptor ffdE : ffd) { + final int pos = ffdE.getPosition(); + Assert.assertTrue(ffdE.getPosition() <= 8 ); + Assert.assertTrue(0 <= ffdE.getPosition() ); + + if(pos == 0) { + Assert.assertEquals(List.class, ffdE.getType().getTypeClass()); + } + if(pos == 1) { + Assert.assertEquals(Date.class, ffdE.getType().getTypeClass()); + } + if(pos == 2) { + Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass()); + } + if(pos == 3) { + Assert.assertEquals(Object.class, ffdE.getType().getTypeClass()); + } + if(pos == 4) { + Assert.assertEquals(Float.class, ffdE.getType().getTypeClass()); + } + if(pos == 5) { + Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); + } + if(pos == 6) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 7) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 8) { + Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); + } + if(pos == 9) { + Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); + } + } + ffd.clear(); + + pojoType.getFlatFields("*", 0, ffd); + Assert.assertEquals(10, ffd.size()); + // check if it returns 0-8 + for(FlatFieldDescriptor ffdE : ffd) { + Assert.assertTrue(ffdE.getPosition() <= 9 ); + Assert.assertTrue(0 <= ffdE.getPosition() ); + if(ffdE.getPosition() == 9) { + Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); + } + } + ffd.clear(); + + TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex + Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo); + + Assert.assertEquals(7, typeComplexNested.getArity()); + Assert.assertEquals(9, typeComplexNested.getTotalFields()); + PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested; + + boolean dateSeen = false, intSeen = false, floatSeen = false, + tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false; + for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) { + PojoField field = pojoTypeComplexNested.getPojoFieldAt(i); + String name = field.getField().getName(); + if(name.equals("date")) { + if(dateSeen) { + Assert.fail("already seen"); + } + dateSeen = true; + Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation()); + Assert.assertEquals(Date.class, field.getTypeInformation().getTypeClass()); + } else if(name.equals("someNumberWithÃnicödeNäme")) { + if(intSeen) { + Assert.fail("already seen"); + } + intSeen = true; + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); + Assert.assertEquals(Integer.class, field.getTypeInformation().getTypeClass()); + } else if(name.equals("someFloat")) { + if(floatSeen) { + Assert.fail("already seen"); + } + floatSeen = true; + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation()); + Assert.assertEquals(Float.class, field.getTypeInformation().getTypeClass()); + } else if(name.equals("word")) { + if(tupleSeen) { + Assert.fail("already seen"); + } + tupleSeen = true; + Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>); + Assert.assertEquals(Tuple3.class, field.getTypeInformation().getTypeClass()); + // do some more advanced checks on the tuple + TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.getTypeInformation(); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2)); + } else if(name.equals("nothing")) { + if(objectSeen) { + Assert.fail("already seen"); + } + objectSeen = true; + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); + Assert.assertEquals(Object.class, field.getTypeInformation().getTypeClass()); + } else if(name.equals("hadoopCitizen")) { + if(writableSeen) { + Assert.fail("already seen"); + } + writableSeen = true; + Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation()); + Assert.assertEquals(MyWritable.class, field.getTypeInformation().getTypeClass()); + } else if(name.equals("collection")) { + if(collectionSeen) { + Assert.fail("already seen"); + } + collectionSeen = true; + Assert.assertEquals(new GenericTypeInfo(List.class), field.getTypeInformation()); + + } else { + Assert.fail("field "+field+" is not expected"); + } + } + Assert.assertTrue("Field was not present", dateSeen); + Assert.assertTrue("Field was not present", intSeen); + Assert.assertTrue("Field was not present", floatSeen); + Assert.assertTrue("Field was not present", tupleSeen); + Assert.assertTrue("Field was not present", objectSeen); + Assert.assertTrue("Field was not present", writableSeen); + Assert.assertTrue("Field was not present", collectionSeen); + + TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count + Assert.assertTrue(typeAtOne instanceof BasicTypeInfo); + + Assert.assertEquals(typeInfo.getTypeClass(), WC.class); + Assert.assertEquals(typeInfo.getArity(), 2); + } + + @Test + public void testPojoAllPublic() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class); + checkAllPublicAsserts(typeForClass); + + TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() ); + checkAllPublicAsserts(typeForObject); + } + + private void checkAllPublicAsserts(TypeInformation<?> typeInformation) { + Assert.assertTrue(typeInformation instanceof PojoTypeInfo); + Assert.assertEquals(10, typeInformation.getArity()); + Assert.assertEquals(12, typeInformation.getTotalFields()); + // check if the three additional fields are identified correctly + boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false; + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.getField().getName(); + if(name.equals("somethingFancy")) { + if(arrayListSeen) { + Assert.fail("already seen"); + } + arrayListSeen = true; + Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo); + Assert.assertEquals(ArrayList.class, field.getTypeInformation().getTypeClass()); + } else if(name.equals("fancyIds")) { + if(multisetSeen) { + Assert.fail("already seen"); + } + multisetSeen = true; + Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo); + Assert.assertEquals(HashMultiset.class, field.getTypeInformation().getTypeClass()); + } else if(name.equals("fancyArray")) { + if(strArraySeen) { + Assert.fail("already seen"); + } + strArraySeen = true; + Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.getTypeInformation()); + Assert.assertEquals(String[].class, field.getTypeInformation().getTypeClass()); + } else if(Arrays.asList("date", "someNumberWithÃnicödeNäme", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) { + // ignore these, they are inherited from the ComplexNestedClass + } + else { + Assert.fail("field "+field+" is not expected"); + } + } + Assert.assertTrue("Field was not present", arrayListSeen); + Assert.assertTrue("Field was not present", multisetSeen); + Assert.assertTrue("Field was not present", strArraySeen); + } + + @Test + public void testPojoExtendingTuple() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class); + checkFromTuplePojo(typeForClass); + + FromTuple ft = new FromTuple(); + ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L; + TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft); + checkFromTuplePojo(typeForObject); + } + + private void checkFromTuplePojo(TypeInformation<?> typeInformation) { + Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>); + Assert.assertEquals(4, typeInformation.getTotalFields()); + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.getField().getName(); + if(name.equals("special")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); + } else if(name.equals("f0") || name.equals("f1")) { + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); + } else if(name.equals("f2")) { + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); + } else { + Assert.fail("unexpected field"); + } + } + } + + @Test + public void testPojoWithGenerics() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.getField().getName(); + if(name.equals("field1")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); + } else if (name.equals("field2")) { + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); + } else if (name.equals("field3")) { + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); + } else if (name.equals("key")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); + } else { + Assert.fail("Unexpected field "+field); + } + } + } + + /** + * Test if the TypeExtractor is accepting untyped generics, + * making them GenericTypes + */ + @Test + public void testPojoWithGenericsSomeFieldsGeneric() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.getField().getName(); + if(name.equals("field1")) { + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); + } else if (name.equals("field2")) { + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); + } else if (name.equals("key")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); + } else { + Assert.fail("Unexpected field "+field); + } + } + } + + + @Test + public void testPojoWithComplexHierarchy() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.getField().getName(); + if(name.equals("field1")) { + Assert.assertTrue(field.getTypeInformation() instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!) + } else if (name.equals("field2")) { + Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>); + Assert.assertTrue( ((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) ); + } else if (name.equals("key")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); + } else { + Assert.fail("Unexpected field "+field); + } + } + } + + public static class MyMapper<T> implements MapFunction<PojoWithGenerics<Long, T>, PojoWithGenerics<T,T>> { + private static final long serialVersionUID = 1L; + + @Override + public PojoWithGenerics<T, T> map(PojoWithGenerics<Long, T> value) + throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericPojoTypeInference1() { + MapFunction<?, ?> function = new MyMapper<String>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) + TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=Long,field2=String>")); + + Assert.assertTrue(ti instanceof PojoTypeInfo<?>); + PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; + for(int i = 0; i < pti.getArity(); i++) { + PojoField field = pti.getPojoFieldAt(i); + String name = field.getField().getName(); + if(name.equals("field1")) { + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); + } else if (name.equals("field2")) { + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); + } else if (name.equals("key")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); + } else { + Assert.fail("Unexpected field "+field); + } + } + } + + public static class PojoTuple<A, B, C> extends Tuple3<B, C, Long> { + private static final long serialVersionUID = 1L; + + public A extraField; + } + + public static class MyMapper2<D, E> implements MapFunction<Tuple2<E, D>, PojoTuple<E, D, D>> { + private static final long serialVersionUID = 1L; + + @Override + public PojoTuple<E, D, D> map(Tuple2<E, D> value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericPojoTypeInference2() { + MapFunction<?, ?> function = new MyMapper2<Boolean, Character>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) + TypeInfoParser.parse("Tuple2<Character,Boolean>")); + Assert.assertTrue(ti instanceof PojoTypeInfo<?>); + PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; + for(int i = 0; i < pti.getArity(); i++) { + PojoField field = pti.getPojoFieldAt(i); + String name = field.getField().getName(); + if(name.equals("extraField")) { + Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation()); + } else if (name.equals("f0")) { + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation()); + } else if (name.equals("f1")) { + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation()); + } else if (name.equals("f2")) { + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); + } else { + Assert.fail("Unexpected field "+field); + } + } + } + + public static class MyMapper3<D, E> implements MapFunction<PojoTuple<E, D, D>, Tuple2<E, D>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<E, D> map(PojoTuple<E, D, D> value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericPojoTypeInference3() { + MapFunction<?, ?> function = new MyMapper3<Boolean, Character>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) + TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoTuple<extraField=char,f0=boolean,f1=boolean,f2=long>")); + + Assert.assertTrue(ti instanceof TupleTypeInfo<?>); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(1)); + } + + public static class PojoWithParameterizedFields1<Z> { + public Tuple2<Z, Z> field; + } + + public static class MyMapper4<A> implements MapFunction<PojoWithParameterizedFields1<A>, A> { + private static final long serialVersionUID = 1L; + @Override + public A map(PojoWithParameterizedFields1<A> value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericPojoTypeInference4() { + MapFunction<?, ?> function = new MyMapper4<Byte>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) + TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields1<field=Tuple2<byte,byte>>")); + Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti); + } + + public static class PojoWithParameterizedFields2<Z> { + public PojoWithGenerics<Z, Z> field; + } + + public static class MyMapper5<A> implements MapFunction<PojoWithParameterizedFields2<A>, A> { + private static final long serialVersionUID = 1L; + @Override + public A map(PojoWithParameterizedFields2<A> value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericPojoTypeInference5() { + MapFunction<?, ?> function = new MyMapper5<Byte>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) + TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields2<" + + "field=org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=byte,field2=byte>" + + ">")); + Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti); + } + + public static class PojoWithParameterizedFields3<Z> { + public Z[] field; + } + + public static class MyMapper6<A> implements MapFunction<PojoWithParameterizedFields3<A>, A> { + private static final long serialVersionUID = 1L; + @Override + public A map(PojoWithParameterizedFields3<A> value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericPojoTypeInference6() { + MapFunction<?, ?> function = new MyMapper6<Integer>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) + TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields3<" + + "field=int[]" + + ">")); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); + } + + public static class MyMapper7<A> implements MapFunction<PojoWithParameterizedFields4<A>, A> { + private static final long serialVersionUID = 1L; + @Override + public A map(PojoWithParameterizedFields4<A> value) throws Exception { + return null; + } + } + + public static class PojoWithParameterizedFields4<Z> { + public Tuple1<Z>[] field; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericPojoTypeInference7() { + MapFunction<?, ?> function = new MyMapper7<Integer>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) + TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields4<" + + "field=Tuple1<int>[]" + + ">")); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); + } + + public static class RecursivePojo1 { + public RecursivePojo1 field; + } + + public static class RecursivePojo2 { + public Tuple1<RecursivePojo2> field; + } + + public static class RecursivePojo3 { + public NestedPojo field; + } + + public static class NestedPojo { + public RecursivePojo3 field; + } + + @Test + public void testRecursivePojo1() { + TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class); + Assert.assertTrue(ti instanceof PojoTypeInfo); + Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).getTypeInformation().getClass()); + } + + @Test + public void testRecursivePojo2() { + TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class); + Assert.assertTrue(ti instanceof PojoTypeInfo); + PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0); + Assert.assertTrue(pf.getTypeInformation() instanceof TupleTypeInfo); + Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.getTypeInformation()).getTypeAt(0).getClass()); + } + + @Test + public void testRecursivePojo3() { + TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class); + Assert.assertTrue(ti instanceof PojoTypeInfo); + PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0); + Assert.assertTrue(pf.getTypeInformation() instanceof PojoTypeInfo); + Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass()); + } + + public static class FooBarPojo { + public int foo, bar; + public FooBarPojo() {} + } + + public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> { + @Override + public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception { + return null; + } + } + + @Test + public void testDualUseOfPojo() { + MapFunction<?, ?> function = new DuplicateMapper(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class)); + Assert.assertTrue(ti instanceof TupleTypeInfo); + TupleTypeInfo<?> tti = ((TupleTypeInfo) ti); + Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo); + Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java new file mode 100644 index 0000000..dbe5115 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.InstantiationUtil; +import org.junit.Test; + +import java.io.IOException; + +public class PojoTypeInfoTest { + + @Test + public void testPojoTypeInfoEquality() { + try { + TypeInformation<TestPojo> info1 = TypeExtractor.getForClass(TestPojo.class); + TypeInformation<TestPojo> info2 = TypeExtractor.getForClass(TestPojo.class); + + assertTrue(info1 instanceof PojoTypeInfo); + assertTrue(info2 instanceof PojoTypeInfo); + + assertTrue(info1.equals(info2)); + assertTrue(info1.hashCode() == info2.hashCode()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPojoTypeInfoInequality() { + try { + TypeInformation<TestPojo> info1 = TypeExtractor.getForClass(TestPojo.class); + TypeInformation<AlternatePojo> info2 = TypeExtractor.getForClass(AlternatePojo.class); + + assertTrue(info1 instanceof PojoTypeInfo); + assertTrue(info2 instanceof PojoTypeInfo); + + assertFalse(info1.equals(info2)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSerializabilityOfPojoTypeInfo() throws IOException, ClassNotFoundException { + PojoTypeInfo<TestPojo> pojoTypeInfo = (PojoTypeInfo<TestPojo>)TypeExtractor.getForClass(TestPojo.class); + + byte[] serializedPojoTypeInfo = InstantiationUtil.serializeObject(pojoTypeInfo); + PojoTypeInfo<TestPojo> deserializedPojoTypeInfo = (PojoTypeInfo<TestPojo>)InstantiationUtil.deserializeObject( + serializedPojoTypeInfo, + getClass().getClassLoader()); + + assertEquals(pojoTypeInfo, deserializedPojoTypeInfo); + } + + @Test + public void testPrimitivePojo() { + TypeInformation<PrimitivePojo> info1 = TypeExtractor.getForClass(PrimitivePojo.class); + + assertTrue(info1 instanceof PojoTypeInfo); + } + + @Test + public void testUnderscorePojo() { + TypeInformation<UnderscorePojo> info1 = TypeExtractor.getForClass(UnderscorePojo.class); + + assertTrue(info1 instanceof PojoTypeInfo); + } + + public static final class TestPojo { + + public int someInt; + + private String aString; + + public Double[] doubleArray; + + + public void setaString(String aString) { + this.aString = aString; + } + + public String getaString() { + return aString; + } + } + + public static final class AlternatePojo { + + public int someInt; + + private String aString; + + public Double[] doubleArray; + + + public void setaString(String aString) { + this.aString = aString; + } + + public String getaString() { + return aString; + } + } + + public static final class PrimitivePojo { + + private int someInt; + + public void setSomeInt(Integer someInt) { + this.someInt = someInt; + } + + public Integer getSomeInt() { + return this.someInt; + } + } + + public static final class UnderscorePojo { + + private int some_int; + + public void setSomeInt(int some_int) { + this.some_int = some_int; + } + + public Integer getSomeInt() { + return this.some_int; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java new file mode 100644 index 0000000..51e481d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.assertTrue; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.junit.Test; + +public class PojoTypeInformationTest { + + public static class SimplePojo { + public String str; + public Boolean Bl; + public boolean bl; + public Byte Bt; + public byte bt; + public Short Shrt; + public short shrt; + public Integer Intgr; + public int intgr; + public Long Lng; + public long lng; + public Float Flt; + public float flt; + public Double Dbl; + public double dbl; + public Character Ch; + public char ch; + public int[] primIntArray; + public Integer[] intWrapperArray; + } + + @Test + public void testSimplePojoTypeExtraction() { + TypeInformation<SimplePojo> type = TypeExtractor.getForClass(SimplePojo.class); + assertTrue("Extracted type is not a composite/pojo type but should be.", type instanceof CompositeType); + } + + public static class NestedPojoInner { + public String field; + } + + public static class NestedPojoOuter { + public Integer intField; + public NestedPojoInner inner; + } + + @Test + public void testNestedPojoTypeExtraction() { + TypeInformation<NestedPojoOuter> type = TypeExtractor.getForClass(NestedPojoOuter.class); + assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType); + } + + public static class Recursive1Pojo { + public Integer intField; + public Recursive2Pojo rec; + } + + public static class Recursive2Pojo { + public String strField; + public Recursive1Pojo rec; + } + + @Test + public void testRecursivePojoTypeExtraction() { + // This one tests whether a recursive pojo is detected using the set of visited + // types in the type extractor. The recursive field will be handled using the generic serializer. + TypeInformation<Recursive1Pojo> type = TypeExtractor.getForClass(Recursive1Pojo.class); + assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType); + } + + @Test + public void testRecursivePojoObjectTypeExtraction() { + TypeInformation<Recursive1Pojo> type = TypeExtractor.getForObject(new Recursive1Pojo()); + assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java new file mode 100644 index 0000000..b6cff34 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import org.junit.Test; + +public class TupleTypeInfoTest extends TestLogger { + + @Test + public void testTupleTypeInfoSymmetricEqualityRelation() { + TupleTypeInfo<Tuple1<Integer>> tupleTypeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO); + + TupleTypeInfoBase<Tuple1> anonymousTupleTypeInfo = new TupleTypeInfoBase<Tuple1>( + (Class<Tuple1>)Tuple1.class, + (TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO) { + + private static final long serialVersionUID = -7985593598027660836L; + + @Override + public TypeSerializer<Tuple1> createSerializer(ExecutionConfig config) { + return null; + } + + @Override + protected TypeComparatorBuilder<Tuple1> createTypeComparatorBuilder() { + return null; + } + + @Override + public String[] getFieldNames() { + return new String[0]; + } + + @Override + public int getFieldIndex(String fieldName) { + return 0; + } + }; + + boolean tupleVsAnonymous = tupleTypeInfo.equals(anonymousTupleTypeInfo); + boolean anonymousVsTuple = anonymousTupleTypeInfo.equals(tupleTypeInfo); + + Assert.assertTrue("Equality relation should be symmetric", tupleVsAnonymous == anonymousVsTuple); + } + + @Test + public void testTupleTypeInfoEquality() { + TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new TupleTypeInfo<>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo2 = new TupleTypeInfo<>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + Assert.assertEquals(tupleTypeInfo1, tupleTypeInfo2); + Assert.assertEquals(tupleTypeInfo1.hashCode(), tupleTypeInfo2.hashCode()); + } + + @Test + public void testTupleTypeInfoInequality() { + TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new TupleTypeInfo<>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + TupleTypeInfo<Tuple2<Integer, Boolean>> tupleTypeInfo2 = new TupleTypeInfo<>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO); + + Assert.assertNotEquals(tupleTypeInfo1, tupleTypeInfo2); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java new file mode 100644 index 0000000..a606896 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.io.IOException; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; + +import org.junit.Test; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class TypeExtractorInputFormatsTest { + + @Test + public void testExtractInputFormatType() { + try { + InputFormat<?, ?> format = new DummyFloatInputFormat(); + TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format); + assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, typeInfo); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testExtractDerivedInputFormatType() { + try { + // simple type + { + InputFormat<?, ?> format = new DerivedInputFormat(); + TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format); + assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo); + } + + // composite type + { + InputFormat<?, ?> format = new DerivedTupleInputFormat(); + TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format); + + assertTrue(typeInfo.isTupleType()); + assertTrue(typeInfo instanceof TupleTypeInfo); + + @SuppressWarnings("unchecked") + TupleTypeInfo<Tuple3<String, Short, Double>> tupleInfo = (TupleTypeInfo<Tuple3<String, Short, Double>>) typeInfo; + + assertEquals(3, tupleInfo.getArity()); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleInfo.getTypeAt(0)); + assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tupleInfo.getTypeAt(1)); + assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tupleInfo.getTypeAt(2)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMultiLevelDerivedInputFormatType() { + try { + + // composite type + { + InputFormat<?, ?> format = new FinalRelativeInputFormat(); + TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format); + + assertTrue(typeInfo.isTupleType()); + assertTrue(typeInfo instanceof TupleTypeInfo); + + @SuppressWarnings("unchecked") + TupleTypeInfo<Tuple3<String, Integer, Double>> tupleInfo = (TupleTypeInfo<Tuple3<String, Integer, Double>>) typeInfo; + + assertEquals(3, tupleInfo.getArity()); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleInfo.getTypeAt(0)); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleInfo.getTypeAt(1)); + assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tupleInfo.getTypeAt(2)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testQueryableFormatType() { + try { + InputFormat<?, ?> format = new QueryableInputFormat(); + TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format); + assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, typeInfo); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + // Test formats + // -------------------------------------------------------------------------------------------- + + public static final class DummyFloatInputFormat implements InputFormat<Float, InputSplit> { + + @Override + public void configure(Configuration parameters) {} + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) { return null; } + + @Override + public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; } + + @Override + public void open(InputSplit split) {} + + @Override + public boolean reachedEnd() { return false; } + + @Override + public Float nextRecord(Float reuse) throws IOException { return null; } + + @Override + public void close() {} + } + + // -------------------------------------------------------------------------------------------- + + public static final class DerivedInputFormat extends GenericInputFormat<Short> { + + @Override + public boolean reachedEnd() { return false; } + + @Override + public Short nextRecord(Short reuse) { return null; } + } + + // -------------------------------------------------------------------------------------------- + + public static final class DerivedTupleInputFormat extends GenericInputFormat<Tuple3<String, Short, Double>> { + + @Override + public boolean reachedEnd() { return false; } + + @Override + public Tuple3<String, Short, Double> nextRecord(Tuple3<String, Short, Double> reuse) { return null; } + } + + // -------------------------------------------------------------------------------------------- + + public static class RelativeInputFormat<T> extends GenericInputFormat<Tuple3<String, T, Double>> { + + @Override + public boolean reachedEnd() { return false; } + + @Override + public Tuple3<String, T, Double> nextRecord(Tuple3<String, T, Double> reuse) { return null; } + } + + // -------------------------------------------------------------------------------------------- + + public static final class FinalRelativeInputFormat extends RelativeInputFormat<Integer> { + + @Override + public Tuple3<String, Integer, Double> nextRecord(Tuple3<String, Integer, Double> reuse) { return null; } + } + + // -------------------------------------------------------------------------------------------- + + public static final class QueryableInputFormat implements InputFormat<Float, InputSplit>, ResultTypeQueryable<Double> { + + @Override + public void configure(Configuration parameters) {} + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) { return null; } + + @Override + public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; } + + @Override + public void open(InputSplit split) {} + + @Override + public boolean reachedEnd() { return false; } + + @Override + public Float nextRecord(Float reuse) throws IOException { return null; } + + @Override + public void close() {} + + @Override + public TypeInformation<Double> getProducedType() { + return BasicTypeInfo.DOUBLE_TYPE_INFO; + } + } +}