http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
new file mode 100644
index 0000000..bc11848
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.HashMultiset;
+
+/**
+ *  Pojo Type tests
+ *
+ *  A Pojo is a bean-style class with getters, setters and empty ctor
+ *   OR a class with all fields public (or for every private field, there has 
to be a public getter/setter)
+ *   everything else is a generic type (that can't be used for field selection)
+ */
+public class PojoTypeExtractionTest {
+
+       public static class HasDuplicateField extends WC {
+               @SuppressWarnings("unused")
+               private int count; // duplicate
+       }
+
+       @Test(expected=RuntimeException.class)
+       public void testDuplicateFieldException() {
+               TypeExtractor.createTypeInfo(HasDuplicateField.class);
+       }
+
+       // test with correct pojo types
+       public static class WC { // is a pojo
+               public ComplexNestedClass complex; // is a pojo
+               private int count; // is a BasicType
+
+               public WC() {
+               }
+               public int getCount() {
+                       return count;
+               }
+               public void setCount(int c) {
+                       this.count = c;
+               }
+       }
+       public static class ComplexNestedClass { // pojo
+               public static int ignoreStaticField;
+               public transient int ignoreTransientField;
+               public Date date; // generic type
+               public Integer someNumberWithÜnicödeNäme; // BasicType
+               public float someFloat; // BasicType
+               public Tuple3<Long, Long, String> word; //Tuple Type with three 
basic types
+               public Object nothing; // generic type
+               public MyWritable hadoopCitizen;  // writableType
+               public List<String> collection;
+       }
+
+       // all public test
+       public static class AllPublic extends ComplexNestedClass {
+               public ArrayList<String> somethingFancy; // generic type
+               public HashMultiset<Integer> fancyIds; // generic type
+               public String[] fancyArray;                      // generic type
+       }
+
+       public static class ParentSettingGenerics extends 
PojoWithGenerics<Integer, Long> {
+               public String field3;
+       }
+       public static class PojoWithGenerics<T1, T2> {
+               public int key;
+               public T1 field1;
+               public T2 field2;
+       }
+
+       public static class ComplexHierarchyTop extends 
ComplexHierarchy<Tuple1<String>> {}
+       public static class ComplexHierarchy<T> extends 
PojoWithGenerics<FromTuple,T> {}
+
+       // extends from Tuple and adds a field
+       public static class FromTuple extends Tuple3<String, String, Long> {
+               private static final long serialVersionUID = 1L;
+               public int special;
+       }
+
+       public static class IncorrectPojo {
+               private int isPrivate;
+               public int getIsPrivate() {
+                       return isPrivate;
+               }
+               // setter is missing (intentional)
+       }
+
+       // correct pojo
+       public static class BeanStylePojo {
+               public String abc;
+               private int field;
+               public int getField() {
+                       return this.field;
+               }
+               public void setField(int f) {
+                       this.field = f;
+               }
+       }
+       public static class WrongCtorPojo {
+               public int a;
+               public WrongCtorPojo(int a) {
+                       this.a = a;
+               }
+       }
+
+       public static class PojoWithGenericFields {
+               private Collection<String> users;
+               private boolean favorited;
+
+               public boolean isFavorited() {
+                       return favorited;
+               }
+
+               public void setFavorited(boolean favorited) {
+                       this.favorited = favorited;
+               }
+
+               public Collection<String> getUsers() {
+                       return users;
+               }
+
+               public void setUsers(Collection<String> users) {
+                       this.users = users;
+               }
+       }
+       @Test
+       public void testPojoWithGenericFields() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(PojoWithGenericFields.class);
+
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+       }
+
+
+       // in this test, the location of the getters and setters is mixed 
across the type hierarchy.
+       public static class TypedPojoGetterSetterCheck extends 
GenericPojoGetterSetterCheck<String> {
+               public void setPackageProtected(String in) {
+                       this.packageProtected = in;
+               }
+       }
+       public static class GenericPojoGetterSetterCheck<T> {
+               T packageProtected;
+               public T getPackageProtected() {
+                       return packageProtected;
+               }
+       }
+
+       @Test
+       public void testIncorrectPojos() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(IncorrectPojo.class);
+               Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
+
+               typeForClass = 
TypeExtractor.createTypeInfo(WrongCtorPojo.class);
+               Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
+       }
+
+       @Test
+       public void testCorrectPojos() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(BeanStylePojo.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+
+               typeForClass = 
TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+       }
+
+       @Test
+       public void testPojoWC() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(WC.class);
+               checkWCPojoAsserts(typeForClass);
+
+               WC t = new WC();
+               t.complex = new ComplexNestedClass();
+               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(t);
+               checkWCPojoAsserts(typeForObject);
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
+               Assert.assertFalse(typeInfo.isBasicType());
+               Assert.assertFalse(typeInfo.isTupleType());
+               Assert.assertEquals(10, typeInfo.getTotalFields());
+               Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
+               PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
+
+               List<FlatFieldDescriptor> ffd = new 
ArrayList<FlatFieldDescriptor>();
+               String[] fields = {"count",
+                               "complex.date",
+                               "complex.hadoopCitizen",
+                               "complex.collection",
+                               "complex.nothing",
+                               "complex.someFloat",
+                               "complex.someNumberWithÜnicödeNäme",
+                               "complex.word.f0",
+                               "complex.word.f1",
+                               "complex.word.f2"};
+               int[] positions = {9,
+                               1,
+                               2,
+                               0,
+                               3,
+                               4,
+                               5,
+                               6,
+                               7,
+                               8};
+               Assert.assertEquals(fields.length, positions.length);
+               for(int i = 0; i < fields.length; i++) {
+                       pojoType.getFlatFields(fields[i], 0, ffd);
+                       Assert.assertEquals("Too many keys returned", 1, 
ffd.size());
+                       Assert.assertEquals("position of field "+fields[i]+" 
wrong", positions[i], ffd.get(0).getPosition());
+                       ffd.clear();
+               }
+
+               pojoType.getFlatFields("complex.word.*", 0, ffd);
+               Assert.assertEquals(3, ffd.size());
+               // check if it returns 5,6,7
+               for(FlatFieldDescriptor ffdE : ffd) {
+                       final int pos = ffdE.getPosition();
+                       Assert.assertTrue(pos <= 8 );
+                       Assert.assertTrue(6 <= pos );
+                       if(pos == 6) {
+                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 7) {
+                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 8) {
+                               Assert.assertEquals(String.class, 
ffdE.getType().getTypeClass());
+                       }
+               }
+               ffd.clear();
+
+               // scala style full tuple selection for pojos
+               pojoType.getFlatFields("complex.word._", 0, ffd);
+               Assert.assertEquals(3, ffd.size());
+               ffd.clear();
+
+               pojoType.getFlatFields("complex.*", 0, ffd);
+               Assert.assertEquals(9, ffd.size());
+               // check if it returns 0-7
+               for(FlatFieldDescriptor ffdE : ffd) {
+                       final int pos = ffdE.getPosition();
+                       Assert.assertTrue(ffdE.getPosition() <= 8 );
+                       Assert.assertTrue(0 <= ffdE.getPosition() );
+
+                       if(pos == 0) {
+                               Assert.assertEquals(List.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 1) {
+                               Assert.assertEquals(Date.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 2) {
+                               Assert.assertEquals(MyWritable.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 3) {
+                               Assert.assertEquals(Object.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 4) {
+                               Assert.assertEquals(Float.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 5) {
+                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 6) {
+                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 7) {
+                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 8) {
+                               Assert.assertEquals(String.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 9) {
+                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
+                       }
+               }
+               ffd.clear();
+
+               pojoType.getFlatFields("*", 0, ffd);
+               Assert.assertEquals(10, ffd.size());
+               // check if it returns 0-8
+               for(FlatFieldDescriptor ffdE : ffd) {
+                       Assert.assertTrue(ffdE.getPosition() <= 9 );
+                       Assert.assertTrue(0 <= ffdE.getPosition() );
+                       if(ffdE.getPosition() == 9) {
+                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
+                       }
+               }
+               ffd.clear();
+
+               TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); 
// ComplexNestedClass complex
+               Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
+
+               Assert.assertEquals(7, typeComplexNested.getArity());
+               Assert.assertEquals(9, typeComplexNested.getTotalFields());
+               PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) 
typeComplexNested;
+
+               boolean dateSeen = false, intSeen = false, floatSeen = false,
+                               tupleSeen = false, objectSeen = false, 
writableSeen = false, collectionSeen = false;
+               for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
+                       PojoField field = 
pojoTypeComplexNested.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       if(name.equals("date")) {
+                               if(dateSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               dateSeen = true;
+                               
Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation());
+                               Assert.assertEquals(Date.class, 
field.getTypeInformation().getTypeClass());
+                       } else if(name.equals("someNumberWithÜnicödeNäme")) {
+                               if(intSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               intSeen = true;
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+                               Assert.assertEquals(Integer.class, 
field.getTypeInformation().getTypeClass());
+                       } else if(name.equals("someFloat")) {
+                               if(floatSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               floatSeen = true;
+                               
Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation());
+                               Assert.assertEquals(Float.class, 
field.getTypeInformation().getTypeClass());
+                       } else if(name.equals("word")) {
+                               if(tupleSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               tupleSeen = true;
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof TupleTypeInfo<?>);
+                               Assert.assertEquals(Tuple3.class, 
field.getTypeInformation().getTypeClass());
+                               // do some more advanced checks on the tuple
+                               TupleTypeInfo<?> tupleTypeFromComplexNested = 
(TupleTypeInfo<?>) field.getTypeInformation();
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(0));
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(1));
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(2));
+                       } else if(name.equals("nothing")) {
+                               if(objectSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               objectSeen = true;
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+                               Assert.assertEquals(Object.class, 
field.getTypeInformation().getTypeClass());
+                       } else if(name.equals("hadoopCitizen")) {
+                               if(writableSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               writableSeen = true;
+                               Assert.assertEquals(new 
WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation());
+                               Assert.assertEquals(MyWritable.class, 
field.getTypeInformation().getTypeClass());
+                       } else if(name.equals("collection")) {
+                               if(collectionSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               collectionSeen = true;
+                               Assert.assertEquals(new 
GenericTypeInfo(List.class), field.getTypeInformation());
+
+                       } else {
+                               Assert.fail("field "+field+" is not expected");
+                       }
+               }
+               Assert.assertTrue("Field was not present", dateSeen);
+               Assert.assertTrue("Field was not present", intSeen);
+               Assert.assertTrue("Field was not present", floatSeen);
+               Assert.assertTrue("Field was not present", tupleSeen);
+               Assert.assertTrue("Field was not present", objectSeen);
+               Assert.assertTrue("Field was not present", writableSeen);
+               Assert.assertTrue("Field was not present", collectionSeen);
+
+               TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int 
count
+               Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
+
+               Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
+               Assert.assertEquals(typeInfo.getArity(), 2);
+       }
+
+       @Test
+       public void testPojoAllPublic() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(AllPublic.class);
+               checkAllPublicAsserts(typeForClass);
+
+               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(new AllPublic() );
+               checkAllPublicAsserts(typeForObject);
+       }
+
+       private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
+               Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
+               Assert.assertEquals(10, typeInformation.getArity());
+               Assert.assertEquals(12, typeInformation.getTotalFields());
+               // check if the three additional fields are identified correctly
+               boolean arrayListSeen = false, multisetSeen = false, 
strArraySeen = false;
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       if(name.equals("somethingFancy")) {
+                               if(arrayListSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               arrayListSeen = true;
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof GenericTypeInfo);
+                               Assert.assertEquals(ArrayList.class, 
field.getTypeInformation().getTypeClass());
+                       } else if(name.equals("fancyIds")) {
+                               if(multisetSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               multisetSeen = true;
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof GenericTypeInfo);
+                               Assert.assertEquals(HashMultiset.class, 
field.getTypeInformation().getTypeClass());
+                       } else if(name.equals("fancyArray")) {
+                               if(strArraySeen) {
+                                       Assert.fail("already seen");
+                               }
+                               strArraySeen = true;
+                               
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, 
field.getTypeInformation());
+                               Assert.assertEquals(String[].class, 
field.getTypeInformation().getTypeClass());
+                       } else if(Arrays.asList("date", 
"someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", 
"hadoopCitizen", "collection").contains(name)) {
+                               // ignore these, they are inherited from the 
ComplexNestedClass
+                       }
+                       else {
+                               Assert.fail("field "+field+" is not expected");
+                       }
+               }
+               Assert.assertTrue("Field was not present", arrayListSeen);
+               Assert.assertTrue("Field was not present", multisetSeen);
+               Assert.assertTrue("Field was not present", strArraySeen);
+       }
+
+       @Test
+       public void testPojoExtendingTuple() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(FromTuple.class);
+               checkFromTuplePojo(typeForClass);
+
+               FromTuple ft = new FromTuple();
+               ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
+               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(ft);
+               checkFromTuplePojo(typeForObject);
+       }
+
+       private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
+               Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
+               Assert.assertEquals(4, typeInformation.getTotalFields());
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       if(name.equals("special")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+                       } else if(name.equals("f0") || name.equals("f1")) {
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
+                       } else if(name.equals("f2")) {
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
+                       } else {
+                               Assert.fail("unexpected field");
+                       }
+               }
+       }
+
+       @Test
+       public void testPojoWithGenerics() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       if(name.equals("field1")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+                       } else if (name.equals("field2")) {
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
+                       } else if (name.equals("field3")) {
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
+                       } else if (name.equals("key")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+                       } else {
+                               Assert.fail("Unexpected field "+field);
+                       }
+               }
+       }
+
+       /**
+        * Test if the TypeExtractor is accepting untyped generics,
+        * making them GenericTypes
+        */
+       @Test
+       public void testPojoWithGenericsSomeFieldsGeneric() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(PojoWithGenerics.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       if(name.equals("field1")) {
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+                       } else if (name.equals("field2")) {
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+                       } else if (name.equals("key")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+                       } else {
+                               Assert.fail("Unexpected field "+field);
+                       }
+               }
+       }
+
+
+       @Test
+       public void testPojoWithComplexHierarchy() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       if(name.equals("field1")) {
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
+                       } else if (name.equals("field2")) {
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof TupleTypeInfo<?>);
+                               Assert.assertTrue( 
((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO)
 );
+                       } else if (name.equals("key")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+                       } else {
+                               Assert.fail("Unexpected field "+field);
+                       }
+               }
+       }
+       
+       public static class MyMapper<T> implements 
MapFunction<PojoWithGenerics<Long, T>, PojoWithGenerics<T,T>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public PojoWithGenerics<T, T> map(PojoWithGenerics<Long, T> 
value)
+                               throws Exception {
+                       return null;
+               }
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testGenericPojoTypeInference1() {
+               MapFunction<?, ?> function = new MyMapper<String>();
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+                               
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=Long,field2=String>"));
+               
+               Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
+               PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
+               for(int i = 0; i < pti.getArity(); i++) {
+                       PojoField field = pti.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       if(name.equals("field1")) {
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
+                       } else if (name.equals("field2")) {
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
+                       } else if (name.equals("key")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+                       } else {
+                               Assert.fail("Unexpected field "+field);
+                       }
+               }
+       }
+
+       public static class PojoTuple<A, B, C> extends Tuple3<B, C, Long> {
+               private static final long serialVersionUID = 1L;
+
+               public A extraField;
+       }
+
+       public static class MyMapper2<D, E> implements MapFunction<Tuple2<E, 
D>, PojoTuple<E, D, D>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public PojoTuple<E, D, D> map(Tuple2<E, D> value) throws 
Exception {
+                       return null;
+               }
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testGenericPojoTypeInference2() {
+               MapFunction<?, ?> function = new MyMapper2<Boolean, 
Character>();
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+                               
TypeInfoParser.parse("Tuple2<Character,Boolean>"));
+               Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
+               PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
+               for(int i = 0; i < pti.getArity(); i++) {
+                       PojoField field = pti.getPojoFieldAt(i);
+                       String name = field.getField().getName();
+                       if(name.equals("extraField")) {
+                               
Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation());
+                       } else if (name.equals("f0")) {
+                               
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
field.getTypeInformation());
+                       } else if (name.equals("f1")) {
+                               
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
field.getTypeInformation());
+                       } else if (name.equals("f2")) {
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
+                       } else {
+                               Assert.fail("Unexpected field "+field);
+                       }
+               }
+       }
+
+       public static class MyMapper3<D, E> implements MapFunction<PojoTuple<E, 
D, D>, Tuple2<E, D>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<E, D> map(PojoTuple<E, D, D> value) throws 
Exception {
+                       return null;
+               }
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testGenericPojoTypeInference3() {
+               MapFunction<?, ?> function = new MyMapper3<Boolean, 
Character>();
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+                               
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoTuple<extraField=char,f0=boolean,f1=boolean,f2=long>"));
+               
+               Assert.assertTrue(ti instanceof TupleTypeInfo<?>);
+               TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+               Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, 
tti.getTypeAt(0));
+               Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
tti.getTypeAt(1));
+       }
+
+       public static class PojoWithParameterizedFields1<Z> {
+               public Tuple2<Z, Z> field;
+       }
+
+       public static class MyMapper4<A> implements 
MapFunction<PojoWithParameterizedFields1<A>, A> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public A map(PojoWithParameterizedFields1<A> value) throws 
Exception {
+                       return null;
+               }
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testGenericPojoTypeInference4() {
+               MapFunction<?, ?> function = new MyMapper4<Byte>();
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+                               
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields1<field=Tuple2<byte,byte>>"));
+               Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
+       }
+
+       public static class PojoWithParameterizedFields2<Z> {
+               public PojoWithGenerics<Z, Z> field;
+       }
+
+       public static class MyMapper5<A> implements 
MapFunction<PojoWithParameterizedFields2<A>, A> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public A map(PojoWithParameterizedFields2<A> value) throws 
Exception {
+                       return null;
+               }
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testGenericPojoTypeInference5() {
+               MapFunction<?, ?> function = new MyMapper5<Byte>();
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+                               
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields2<"
+                                               + 
"field=org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=byte,field2=byte>"
+                                               + ">"));
+               Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
+       }
+       
+       public static class PojoWithParameterizedFields3<Z> {
+               public Z[] field;
+       }
+
+       public static class MyMapper6<A> implements 
MapFunction<PojoWithParameterizedFields3<A>, A> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public A map(PojoWithParameterizedFields3<A> value) throws 
Exception {
+                       return null;
+               }
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testGenericPojoTypeInference6() {
+               MapFunction<?, ?> function = new MyMapper6<Integer>();
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+                               
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields3<"
+                                               + "field=int[]"
+                                               + ">"));
+               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+       }
+
+       public static class MyMapper7<A> implements 
MapFunction<PojoWithParameterizedFields4<A>, A> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public A map(PojoWithParameterizedFields4<A> value) throws 
Exception {
+                       return null;
+               }
+       }
+
+       public static class PojoWithParameterizedFields4<Z> {
+               public Tuple1<Z>[] field;
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testGenericPojoTypeInference7() {
+               MapFunction<?, ?> function = new MyMapper7<Integer>();
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+                               
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields4<"
+                                               + "field=Tuple1<int>[]"
+                                               + ">"));
+               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+       }
+
+       public static class RecursivePojo1 {
+               public RecursivePojo1 field;
+       }
+
+       public static class RecursivePojo2 {
+               public Tuple1<RecursivePojo2> field;
+       }
+
+       public static class RecursivePojo3 {
+               public NestedPojo field;
+       }
+
+       public static class NestedPojo {
+               public RecursivePojo3 field;
+       }
+
+       @Test
+       public void testRecursivePojo1() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo1.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
ti).getPojoFieldAt(0).getTypeInformation().getClass());
+       }
+
+       @Test
+       public void testRecursivePojo2() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo2.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+               Assert.assertTrue(pf.getTypeInformation() instanceof 
TupleTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) 
pf.getTypeInformation()).getTypeAt(0).getClass());
+       }
+
+       @Test
+       public void testRecursivePojo3() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo3.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+               Assert.assertTrue(pf.getTypeInformation() instanceof 
PojoTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass());
+       }
+
+       public static class FooBarPojo {
+               public int foo, bar;
+               public FooBarPojo() {}
+       }
+
+       public static class DuplicateMapper implements MapFunction<FooBarPojo, 
Tuple2<FooBarPojo, FooBarPojo>> {
+               @Override
+               public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) 
throws Exception {
+                       return null;
+               }
+       }
+
+       @Test
+       public void testDualUseOfPojo() {
+               MapFunction<?, ?> function = new DuplicateMapper();
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeExtractor.createTypeInfo(FooBarPojo.class));
+               Assert.assertTrue(ti instanceof TupleTypeInfo);
+               TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
+               Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
+               Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
new file mode 100644
index 0000000..dbe5115
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class PojoTypeInfoTest {
+
+       @Test
+       public void testPojoTypeInfoEquality() {
+               try {
+                       TypeInformation<TestPojo> info1 = 
TypeExtractor.getForClass(TestPojo.class);
+                       TypeInformation<TestPojo> info2 = 
TypeExtractor.getForClass(TestPojo.class);
+                       
+                       assertTrue(info1 instanceof PojoTypeInfo);
+                       assertTrue(info2 instanceof PojoTypeInfo);
+                       
+                       assertTrue(info1.equals(info2));
+                       assertTrue(info1.hashCode() == info2.hashCode());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPojoTypeInfoInequality() {
+               try {
+                       TypeInformation<TestPojo> info1 = 
TypeExtractor.getForClass(TestPojo.class);
+                       TypeInformation<AlternatePojo> info2 = 
TypeExtractor.getForClass(AlternatePojo.class);
+
+                       assertTrue(info1 instanceof PojoTypeInfo);
+                       assertTrue(info2 instanceof PojoTypeInfo);
+
+                       assertFalse(info1.equals(info2));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSerializabilityOfPojoTypeInfo() throws IOException, 
ClassNotFoundException {
+               PojoTypeInfo<TestPojo> pojoTypeInfo = 
(PojoTypeInfo<TestPojo>)TypeExtractor.getForClass(TestPojo.class);
+
+               byte[] serializedPojoTypeInfo = 
InstantiationUtil.serializeObject(pojoTypeInfo);
+               PojoTypeInfo<TestPojo> deserializedPojoTypeInfo = 
(PojoTypeInfo<TestPojo>)InstantiationUtil.deserializeObject(
+                       serializedPojoTypeInfo,
+                       getClass().getClassLoader());
+
+               assertEquals(pojoTypeInfo, deserializedPojoTypeInfo);
+       }
+
+       @Test
+       public void testPrimitivePojo() {
+               TypeInformation<PrimitivePojo> info1 = 
TypeExtractor.getForClass(PrimitivePojo.class);
+
+               assertTrue(info1 instanceof PojoTypeInfo);
+       }
+
+       @Test
+       public void testUnderscorePojo() {
+               TypeInformation<UnderscorePojo> info1 = 
TypeExtractor.getForClass(UnderscorePojo.class);
+
+               assertTrue(info1 instanceof PojoTypeInfo);
+       }
+
+       public static final class TestPojo {
+               
+               public int someInt;
+
+               private String aString;
+               
+               public Double[] doubleArray;
+               
+               
+               public void setaString(String aString) {
+                       this.aString = aString;
+               }
+               
+               public String getaString() {
+                       return aString;
+               }
+       }
+
+       public static final class AlternatePojo {
+
+               public int someInt;
+
+               private String aString;
+
+               public Double[] doubleArray;
+
+
+               public void setaString(String aString) {
+                       this.aString = aString;
+               }
+
+               public String getaString() {
+                       return aString;
+               }
+       }
+
+       public static final class PrimitivePojo {
+
+               private int someInt;
+
+               public void setSomeInt(Integer someInt) {
+                       this.someInt = someInt;
+               }
+
+               public Integer getSomeInt() {
+                       return this.someInt;
+               }
+       }
+
+       public static final class UnderscorePojo {
+
+               private int some_int;
+
+               public void setSomeInt(int some_int) {
+                       this.some_int = some_int;
+               }
+
+               public Integer getSomeInt() {
+                       return this.some_int;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java
new file mode 100644
index 0000000..51e481d
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Test;
+
+public class PojoTypeInformationTest {
+
+       public static class SimplePojo {
+               public String str;
+               public Boolean Bl;
+               public boolean bl;
+               public Byte Bt;
+               public byte bt;
+               public Short Shrt;
+               public short shrt;
+               public Integer Intgr;
+               public int intgr;
+               public Long Lng;
+               public long lng;
+               public Float Flt;
+               public float flt;
+               public Double Dbl;
+               public double dbl;
+               public Character Ch;
+               public char ch;
+               public int[] primIntArray;
+               public Integer[] intWrapperArray;
+       }
+
+       @Test
+       public void testSimplePojoTypeExtraction() {
+               TypeInformation<SimplePojo> type = 
TypeExtractor.getForClass(SimplePojo.class);
+               assertTrue("Extracted type is not a composite/pojo type but 
should be.", type instanceof CompositeType);
+       }
+
+       public static class NestedPojoInner {
+               public String field;
+       }
+
+       public static class NestedPojoOuter {
+               public Integer intField;
+               public NestedPojoInner inner;
+       }
+
+       @Test
+       public void testNestedPojoTypeExtraction() {
+               TypeInformation<NestedPojoOuter> type = 
TypeExtractor.getForClass(NestedPojoOuter.class);
+               assertTrue("Extracted type is not a Pojo type but should be.", 
type instanceof CompositeType);
+       }
+
+       public static class Recursive1Pojo {
+               public Integer intField;
+               public Recursive2Pojo rec;
+       }
+
+       public static class Recursive2Pojo {
+               public String strField;
+               public Recursive1Pojo rec;
+       }
+
+       @Test
+       public void testRecursivePojoTypeExtraction() {
+               // This one tests whether a recursive pojo is detected using 
the set of visited
+               // types in the type extractor. The recursive field will be 
handled using the generic serializer.
+               TypeInformation<Recursive1Pojo> type = 
TypeExtractor.getForClass(Recursive1Pojo.class);
+               assertTrue("Extracted type is not a Pojo type but should be.", 
type instanceof CompositeType);
+       }
+       
+       @Test
+       public void testRecursivePojoObjectTypeExtraction() {
+               TypeInformation<Recursive1Pojo> type = 
TypeExtractor.getForObject(new Recursive1Pojo());
+               assertTrue("Extracted type is not a Pojo type but should be.", 
type instanceof CompositeType);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
new file mode 100644
index 0000000..b6cff34
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TupleTypeInfoTest extends TestLogger {
+
+       @Test
+       public void testTupleTypeInfoSymmetricEqualityRelation() {
+               TupleTypeInfo<Tuple1<Integer>> tupleTypeInfo = new 
TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO);
+
+               TupleTypeInfoBase<Tuple1> anonymousTupleTypeInfo = new 
TupleTypeInfoBase<Tuple1>(
+                       (Class<Tuple1>)Tuple1.class,
+                       (TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO) {
+
+                       private static final long serialVersionUID = 
-7985593598027660836L;
+
+                       @Override
+                       public TypeSerializer<Tuple1> 
createSerializer(ExecutionConfig config) {
+                               return null;
+                       }
+
+                       @Override
+                       protected TypeComparatorBuilder<Tuple1> 
createTypeComparatorBuilder() {
+                               return null;
+                       }
+
+                       @Override
+                       public String[] getFieldNames() {
+                               return new String[0];
+                       }
+
+                       @Override
+                       public int getFieldIndex(String fieldName) {
+                               return 0;
+                       }
+               };
+
+               boolean tupleVsAnonymous = 
tupleTypeInfo.equals(anonymousTupleTypeInfo);
+               boolean anonymousVsTuple = 
anonymousTupleTypeInfo.equals(tupleTypeInfo);
+
+               Assert.assertTrue("Equality relation should be symmetric", 
tupleVsAnonymous == anonymousVsTuple);
+       }
+
+       @Test
+       public void testTupleTypeInfoEquality() {
+               TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new 
TupleTypeInfo<>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo2 = new 
TupleTypeInfo<>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               Assert.assertEquals(tupleTypeInfo1, tupleTypeInfo2);
+               Assert.assertEquals(tupleTypeInfo1.hashCode(), 
tupleTypeInfo2.hashCode());
+       }
+
+       @Test
+       public void testTupleTypeInfoInequality() {
+               TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new 
TupleTypeInfo<>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               TupleTypeInfo<Tuple2<Integer, Boolean>> tupleTypeInfo2 = new 
TupleTypeInfo<>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+               Assert.assertNotEquals(tupleTypeInfo1, tupleTypeInfo2);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java
new file mode 100644
index 0000000..a606896
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class TypeExtractorInputFormatsTest {
+
+       @Test
+       public void testExtractInputFormatType() {
+               try {
+                       InputFormat<?, ?> format = new DummyFloatInputFormat();
+                       TypeInformation<?> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+                       assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, typeInfo);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testExtractDerivedInputFormatType() {
+               try {
+                       // simple type
+                       {
+                               InputFormat<?, ?> format = new 
DerivedInputFormat();
+                               TypeInformation<?> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+                               assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, 
typeInfo);
+                       }
+                       
+                       // composite type
+                       {
+                               InputFormat<?, ?> format = new 
DerivedTupleInputFormat();
+                               TypeInformation<?> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+                               
+                               assertTrue(typeInfo.isTupleType());
+                               assertTrue(typeInfo instanceof TupleTypeInfo);
+                               
+                               @SuppressWarnings("unchecked")
+                               TupleTypeInfo<Tuple3<String, Short, Double>> 
tupleInfo = (TupleTypeInfo<Tuple3<String, Short, Double>>) typeInfo;
+                               
+                               assertEquals(3, tupleInfo.getArity());
+                               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
tupleInfo.getTypeAt(0));
+                               assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, 
tupleInfo.getTypeAt(1));
+                               assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
tupleInfo.getTypeAt(2));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testMultiLevelDerivedInputFormatType() {
+               try {
+
+                       // composite type
+                       {
+                               InputFormat<?, ?> format = new 
FinalRelativeInputFormat();
+                               TypeInformation<?> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+                               
+                               assertTrue(typeInfo.isTupleType());
+                               assertTrue(typeInfo instanceof TupleTypeInfo);
+                               
+                               @SuppressWarnings("unchecked")
+                               TupleTypeInfo<Tuple3<String, Integer, Double>> 
tupleInfo = (TupleTypeInfo<Tuple3<String, Integer, Double>>) typeInfo;
+                               
+                               assertEquals(3, tupleInfo.getArity());
+                               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
tupleInfo.getTypeAt(0));
+                               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleInfo.getTypeAt(1));
+                               assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
tupleInfo.getTypeAt(2));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testQueryableFormatType() {
+               try {
+                       InputFormat<?, ?> format = new QueryableInputFormat();
+                       TypeInformation<?> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+                       assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, typeInfo);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Test formats
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class DummyFloatInputFormat implements 
InputFormat<Float, InputSplit> {
+
+               @Override
+               public void configure(Configuration parameters) {}
+
+               @Override
+               public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) { return null; }
+
+               @Override
+               public InputSplit[] createInputSplits(int minNumSplits) { 
return null; }
+
+               @Override
+               public DefaultInputSplitAssigner 
getInputSplitAssigner(InputSplit[] splits) { return null; }
+
+               @Override
+               public void open(InputSplit split) {}
+
+               @Override
+               public boolean reachedEnd() { return false; }
+
+               @Override
+               public Float nextRecord(Float reuse) throws IOException { 
return null; }
+
+               @Override
+               public void close() {}
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class DerivedInputFormat extends 
GenericInputFormat<Short> {
+
+               @Override
+               public boolean reachedEnd() { return false; }
+
+               @Override
+               public Short nextRecord(Short reuse) { return null; }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class DerivedTupleInputFormat extends 
GenericInputFormat<Tuple3<String, Short, Double>> {
+
+               @Override
+               public boolean reachedEnd() { return false; }
+
+               @Override
+               public Tuple3<String, Short, Double> nextRecord(Tuple3<String, 
Short, Double> reuse) { return null; }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static class RelativeInputFormat<T> extends 
GenericInputFormat<Tuple3<String, T, Double>> {
+
+               @Override
+               public boolean reachedEnd() { return false; }
+
+               @Override
+               public Tuple3<String, T, Double> nextRecord(Tuple3<String, T, 
Double> reuse) { return null; }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class FinalRelativeInputFormat extends 
RelativeInputFormat<Integer> {
+
+               @Override
+               public Tuple3<String, Integer, Double> 
nextRecord(Tuple3<String, Integer, Double> reuse) { return null; }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class QueryableInputFormat implements 
InputFormat<Float, InputSplit>, ResultTypeQueryable<Double> {
+
+               @Override
+               public void configure(Configuration parameters) {}
+
+               @Override
+               public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) { return null; }
+
+               @Override
+               public InputSplit[] createInputSplits(int minNumSplits) { 
return null; }
+
+               @Override
+               public DefaultInputSplitAssigner 
getInputSplitAssigner(InputSplit[] splits) { return null; }
+
+               @Override
+               public void open(InputSplit split) {}
+
+               @Override
+               public boolean reachedEnd() { return false; }
+
+               @Override
+               public Float nextRecord(Float reuse) throws IOException { 
return null; }
+
+               @Override
+               public void close() {}
+
+               @Override
+               public TypeInformation<Double> getProducedType() {
+                       return BasicTypeInfo.DOUBLE_TYPE_INFO;
+               }
+       }
+}

Reply via email to