[FLINK-1437][Java API] Fixes copy() methods in PojoSerializer for null values

This closes #342


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb7ce0e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb7ce0e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb7ce0e3

Branch: refs/heads/master
Commit: fb7ce0e315e270e10c605dcfa269286bc2add47f
Parents: 94c3e9c
Author: twalthr <i...@twalthr.com>
Authored: Mon Jan 26 16:09:24 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 3 11:34:16 2015 +0100

----------------------------------------------------------------------
 .../java/typeutils/runtime/PojoSerializer.java  | 27 +++++++++++++++-----
 .../typeutils/runtime/PojoSerializerTest.java   | 23 ++++++++++++-----
 2 files changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb7ce0e3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 99b9f65..1e58b9d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -142,8 +142,14 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                
                try {
                        for (int i = 0; i < numFields; i++) {
-                               Object copy = 
fieldSerializers[i].copy(fields[i].get(from));
-                               fields[i].set(target, copy);
+                               Object value = fields[i].get(from);
+                               if (value != null) {
+                                       Object copy = 
fieldSerializers[i].copy(value);
+                                       fields[i].set(target, copy);
+                               }
+                               else {
+                                       fields[i].set(target, null);
+                               }
                        }
                }
                catch (IllegalAccessException e) {
@@ -156,8 +162,14 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
        public T copy(T from, T reuse) {
                try {
                        for (int i = 0; i < numFields; i++) {
-                               Object copy = 
fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse));
-                               fields[i].set(reuse, copy);
+                               Object value = fields[i].get(from);
+                               if (value != null) {
+                                       Object copy = 
fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse));
+                                       fields[i].set(reuse, copy);
+                               }
+                               else {
+                                       fields[i].set(reuse, null);
+                               }
                        }
                } catch (IllegalAccessException e) {
                        throw new RuntimeException("Error during POJO copy, 
this should not happen since we check the fields" +
@@ -257,8 +269,11 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                // copy the Non-Null/Null tag
                target.writeBoolean(source.readBoolean());
                for (int i = 0; i < numFields; i++) {
-                       target.writeBoolean(source.readBoolean());
-                       fieldSerializers[i].copy(source, target);
+                       boolean isNull = source.readBoolean();
+                       target.writeBoolean(isNull);
+                       if (!isNull) {
+                               fieldSerializers[i].copy(source, target);
+                       }
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7ce0e3/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index ae47fd3..006625e 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -70,7 +70,8 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                                new TestUserClass(rnd.nextInt(), "foo", 
rnd.nextDouble(), new int[]{1, 2, 3},
                                                new 
NestedTestUserClass(rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[]{10, 
11, 12})),
                                new TestUserClass(rnd.nextInt(), "bar", 
rnd.nextDouble(), new int[]{4, 5, 6},
-                                               new 
NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 
21, 22}))
+                                               new 
NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 
21, 22})),
+                               new TestUserClass(rnd.nextInt(), null, 
rnd.nextDouble(), null, null)
                };
 
        }
@@ -109,21 +110,28 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                        if (dumm1 != otherTUC.dumm1) {
                                return false;
                        }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
+                       if ((dumm2 == null && otherTUC.dumm2 != null)
+                                       || (dumm2 != null && 
!dumm2.equals(otherTUC.dumm2))) {
                                return false;
                        }
                        if (dumm3 != otherTUC.dumm3) {
                                return false;
                        }
-                       if (dumm4.length != otherTUC.dumm4.length) {
+                       if ((dumm4 != null && otherTUC.dumm4 == null)
+                                       || (dumm4 == null && otherTUC.dumm4 != 
null)
+                                       || (dumm4 != null && otherTUC.dumm4 != 
null && dumm4.length != otherTUC.dumm4.length)) {
                                return false;
                        }
-                       for (int i = 0; i < dumm4.length; i++) {
-                               if (dumm4[i] != otherTUC.dumm4[i]) {
-                                       return false;
+                       if (dumm4 != null && otherTUC.dumm4 != null) {
+                               for (int i = 0; i < dumm4.length; i++) {
+                                       if (dumm4[i] != otherTUC.dumm4[i]) {
+                                               return false;
+                                       }
                                }
                        }
-                       if (!nestedClass.equals(otherTUC.nestedClass)) {
+                       
+                       if ((nestedClass == null && otherTUC.nestedClass != 
null)
+                                       || (nestedClass != null && 
!nestedClass.equals(otherTUC.nestedClass))) {
                                return false;
                        }
                        return true;
@@ -181,6 +189,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
        /**
         * This tests if the hashes returned by the pojo and tuple comparators 
are the same
         */
+       @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testTuplePojoTestEquality() {
                

Reply via email to