[FLINK-1437][Java API] Fixes copy() methods in PojoSerializer for null values
This closes #342 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb7ce0e3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb7ce0e3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb7ce0e3 Branch: refs/heads/master Commit: fb7ce0e315e270e10c605dcfa269286bc2add47f Parents: 94c3e9c Author: twalthr <i...@twalthr.com> Authored: Mon Jan 26 16:09:24 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 3 11:34:16 2015 +0100 ---------------------------------------------------------------------- .../java/typeutils/runtime/PojoSerializer.java | 27 +++++++++++++++----- .../typeutils/runtime/PojoSerializerTest.java | 23 ++++++++++++----- 2 files changed, 37 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fb7ce0e3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 99b9f65..1e58b9d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -142,8 +142,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { try { for (int i = 0; i < numFields; i++) { - Object copy = fieldSerializers[i].copy(fields[i].get(from)); - fields[i].set(target, copy); + Object value = fields[i].get(from); + if (value != null) { + Object copy = fieldSerializers[i].copy(value); + fields[i].set(target, copy); + } + else { + fields[i].set(target, null); + } } } catch (IllegalAccessException e) { @@ -156,8 +162,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { public T copy(T from, T reuse) { try { for (int i = 0; i < numFields; i++) { - Object copy = fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse)); - fields[i].set(reuse, copy); + Object value = fields[i].get(from); + if (value != null) { + Object copy = fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse)); + fields[i].set(reuse, copy); + } + else { + fields[i].set(reuse, null); + } } } catch (IllegalAccessException e) { throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + @@ -257,8 +269,11 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { // copy the Non-Null/Null tag target.writeBoolean(source.readBoolean()); for (int i = 0; i < numFields; i++) { - target.writeBoolean(source.readBoolean()); - fieldSerializers[i].copy(source, target); + boolean isNull = source.readBoolean(); + target.writeBoolean(isNull); + if (!isNull) { + fieldSerializers[i].copy(source, target); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fb7ce0e3/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index ae47fd3..006625e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -70,7 +70,8 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te new TestUserClass(rnd.nextInt(), "foo", rnd.nextDouble(), new int[]{1, 2, 3}, new NestedTestUserClass(rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[]{10, 11, 12})), new TestUserClass(rnd.nextInt(), "bar", rnd.nextDouble(), new int[]{4, 5, 6}, - new NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 21, 22})) + new NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 21, 22})), + new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null) }; } @@ -109,21 +110,28 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te if (dumm1 != otherTUC.dumm1) { return false; } - if (!dumm2.equals(otherTUC.dumm2)) { + if ((dumm2 == null && otherTUC.dumm2 != null) + || (dumm2 != null && !dumm2.equals(otherTUC.dumm2))) { return false; } if (dumm3 != otherTUC.dumm3) { return false; } - if (dumm4.length != otherTUC.dumm4.length) { + if ((dumm4 != null && otherTUC.dumm4 == null) + || (dumm4 == null && otherTUC.dumm4 != null) + || (dumm4 != null && otherTUC.dumm4 != null && dumm4.length != otherTUC.dumm4.length)) { return false; } - for (int i = 0; i < dumm4.length; i++) { - if (dumm4[i] != otherTUC.dumm4[i]) { - return false; + if (dumm4 != null && otherTUC.dumm4 != null) { + for (int i = 0; i < dumm4.length; i++) { + if (dumm4[i] != otherTUC.dumm4[i]) { + return false; + } } } - if (!nestedClass.equals(otherTUC.nestedClass)) { + + if ((nestedClass == null && otherTUC.nestedClass != null) + || (nestedClass != null && !nestedClass.equals(otherTUC.nestedClass))) { return false; } return true; @@ -181,6 +189,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te /** * This tests if the hashes returned by the pojo and tuple comparators are the same */ + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testTuplePojoTestEquality() {