[FLINK-3922] [core] Fix case of infinite recursion in TypeExtractor

This closes #2011


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1b55f03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1b55f03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1b55f03

Branch: refs/heads/master
Commit: e1b55f033d18b22e8a3f07920fa7c9e5623d6922
Parents: b08b64a
Author: twalthr <twal...@apache.org>
Authored: Thu May 19 17:01:57 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java |  2 +-
 .../java/typeutils/PojoTypeExtractionTest.java  | 53 ++++++++++++++++++--
 2 files changed, 49 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1b55f03/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 45420a2..9d30743 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1245,7 +1245,7 @@ public class TypeExtractor {
        private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, 
Type type) {
                int count = 0;
                for (Type t : typeHierarchy) {
-                       if (t == type || (isClassType(type) && t == 
typeToClass(type))) {
+                       if (t == type || (isClassType(type) && t == 
typeToClass(type)) || (isClassType(t) && typeToClass(t) == type)) {
                                count++;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e1b55f03/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
index ffcfd52..16f3047 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -90,6 +90,10 @@ public class PojoTypeExtractionTest {
                public String[] fancyArray;                      // generic type
        }
 
+       public static class FancyCollectionSubtype<T> extends HashSet<T> {
+               private static final long serialVersionUID = 
-3494469602638179921L;
+       }
+
        public static class ParentSettingGenerics extends 
PojoWithGenerics<Integer, Long> {
                public String field3;
        }
@@ -808,10 +812,49 @@ public class PojoTypeExtractionTest {
                Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
                Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
        }
-       
-       // 
------------------------------------------------------------------------
-       
-       public static class FancyCollectionSubtype<T> extends HashSet<T> {
-               private static final long serialVersionUID = 
-3494469602638179921L;
+
+       public static class PojoWithRecursiveGenericField<K,V> {
+               public PojoWithRecursiveGenericField<K,V> parent;
+               public PojoWithRecursiveGenericField(){}
+       }
+
+       @Test
+       public void testPojoWithRecursiveGenericField() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(PojoWithRecursiveGenericField.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
ti).getPojoFieldAt(0).getTypeInformation().getClass());
+       }
+
+       public static class MutualPojoA {
+               public MutualPojoB field;
+       }
+
+       public static class MutualPojoB {
+               public MutualPojoA field;
+       }
+
+       @Test
+       public void testPojosWithMutualRecursion() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(MutualPojoB.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               TypeInformation<?> pti = ((PojoTypeInfo) 
ti).getPojoFieldAt(0).getTypeInformation();
+               Assert.assertTrue(pti instanceof PojoTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
pti).getPojoFieldAt(0).getTypeInformation().getClass());
        }
+
+       public static class Container<T> {
+               public T field;
+       }
+
+       public static class MyType extends Container<Container<Object>> {}
+
+       @Test
+       public void testRecursivePojoWithTypeVariable() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(MyType.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               TypeInformation<?> pti = ((PojoTypeInfo) 
ti).getPojoFieldAt(0).getTypeInformation();
+               Assert.assertTrue(pti instanceof PojoTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
pti).getPojoFieldAt(0).getTypeInformation().getClass());
+       }
+
 }

Reply via email to