[FLINK-1417] Automatically register types with Kryo

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/354efec0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/354efec0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/354efec0

Branch: refs/heads/master
Commit: 354efec0f9da0fa03ea9b337b02a1a2a03a9ac16
Parents: 5015ab4
Author: Robert Metzger <rmetz...@apache.org>
Authored: Mon Jan 26 17:54:25 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed Feb 18 15:52:08 2015 +0100

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../org/apache/flink/api/io/avro/.gitignore     |   1 -
 .../apache/flink/api/io/avro/AvroPojoTest.java  | 157 ------
 .../flink/api/common/ExecutionConfig.java       | 193 ++++++-
 .../api/common/typeutils/CompositeType.java     |   3 +-
 .../api/common/typeutils/TypeSerializer.java    |   2 +-
 flink-dist/src/main/flink-bin/LICENSE           |   2 +
 .../src/main/resources/log4j.properties         |  23 +
 flink-java/pom.xml                              |  19 +-
 .../flink/api/java/ExecutionEnvironment.java    | 102 +++-
 .../java/org/apache/flink/api/java/Utils.java   |  25 +
 .../flink/api/java/typeutils/AvroTypeInfo.java  |  12 +-
 .../api/java/typeutils/GenericTypeInfo.java     |   2 +-
 .../flink/api/java/typeutils/TypeExtractor.java |  12 +-
 .../java/typeutils/runtime/KryoSerializer.java  | 291 -----------
 .../java/typeutils/runtime/PojoSerializer.java  |   3 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  | 312 +++++++++++
 .../typeutils/runtime/kryo/Serializers.java     | 242 +++++++++
 .../AbstractGenericTypeSerializerTest.java      |   3 -
 .../runtime/KryoGenericArraySerializerTest.java |  29 --
 .../runtime/KryoGenericTypeComparatorTest.java  |  29 --
 .../runtime/KryoGenericTypeSerializerTest.java  | 165 ------
 .../runtime/KryoVersusAvroMinibenchmark.java    | 520 ------------------
 .../runtime/KryoWithCustomSerializersTest.java  |  74 ---
 .../SubclassFromInterfaceSerializerTest.java    |   1 +
 .../kryo/KryoGenericArraySerializerTest.java    |  30 ++
 .../kryo/KryoGenericTypeComparatorTest.java     |  30 ++
 .../kryo/KryoGenericTypeSerializerTest.java     | 169 ++++++
 .../kryo/KryoVersusAvroMinibenchmark.java       | 522 +++++++++++++++++++
 .../kryo/KryoWithCustomSerializersTest.java     |  74 +++
 .../typeutils/runtime/kryo/SerializersTest.java |  66 +++
 .../src/main/resources/log4j.properties         |  23 +
 .../src/main/resources/log4j.properties         |  23 +
 .../flink/api/scala/ExecutionEnvironment.scala  |  39 +-
 .../api/scala/typeutils/TrySerializer.scala     |   2 +-
 .../flink/api/scala/typeutils/TryTypeInfo.scala |   2 +-
 .../apache/flink/api/io/avro/AvroPojoTest.java  | 164 ++++++
 .../api/io/avro/AvroRecordInputFormatTest.java  |  19 +-
 .../io/avro/AvroSplittableInputFormatTest.java  |  11 +-
 .../environment/StreamExecutionEnvironment.java |  38 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  37 +-
 flink-tests/pom.xml                             |  13 -
 .../javaApiOperators/GroupReduceITCase.java     |  17 +-
 .../runtime/KryoGenericTypeSerializerTest.scala |   9 +-
 .../ScalaSpecialTypesSerializerTest.scala       |   2 +-
 .../api/scala/runtime/TupleSerializerTest.scala |   3 +-
 pom.xml                                         |   7 +-
 47 files changed, 2123 insertions(+), 1400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 522cff5..deaa1ee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,3 +19,4 @@ tmp
 _site
 docs/api
 build-target
+flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
deleted file mode 100644
index dc9b237..0000000
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-generated
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
deleted file mode 100644
index 6ff4836..0000000
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.io.avro;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.util.Arrays;
-
-@RunWith(Parameterized.class)
-public class AvroPojoTest extends MultipleProgramsTestBase {
-       public AvroPojoTest(ExecutionMode mode) {
-               super(mode);
-       }
-
-       private File inFile;
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-               inFile = tempFolder.newFile();
-               AvroRecordInputFormatTest.writeTestFile(inFile);
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testSimpleAvroRead() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users)
-                               // null map type because the order changes in 
different JVMs (hard to test)
-                               .map(new MapFunction<User, User>() {
-                       @Override
-                       public User map(User value) throws Exception {
-                               value.setTypeMap(null);
-                               return value;
-                       }
-               });
-
-               usersDS.writeAsText(resultPath);
-
-               env.execute("Simple Avro read job");
-
-
-               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" +
-                               "{\"name\": \"Charlie\", \"favorite_number\": 
null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, 
\"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": 
false, \"type_array_string\": [], \"type_array_boolean\": [], 
\"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null}\n";
-       }
-
-       @Test
-       public void testKeySelection() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Tuple2<String, Integer>> res = 
usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                               for(User u : values) {
-                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
-                               }
-                       }
-               });
-
-               res.writeAsText(resultPath);
-               env.execute("Avro Key selection");
-
-
-               expected = "(Alyssa,1)\n(Charlie,1)\n";
-       }
-
-       /**
-        * Test some know fields for grouping on
-        */
-       @Test
-       public void testAllFields() throws Exception {
-               for(String fieldName : Arrays.asList("name", "type_enum", 
"type_double_test")) {
-                       testField(fieldName);
-               }
-       }
-
-       private void testField(final String fieldName) throws Exception {
-               before();
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Object> res = 
usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Object> out) throws Exception {
-                               for(User u : values) {
-                                       out.collect(u.get(fieldName));
-                               }
-                       }
-               });
-               res.writeAsText(resultPath);
-               env.execute("Simple Avro read job");
-               if(fieldName.equals("name")) {
-                       expected = "Alyssa\nCharlie";
-               } else if(fieldName.equals("type_enum")) {
-                       expected = "GREEN\nRED\n";
-               } else if(fieldName.equals("type_double_test")) {
-                       expected = "123.45\n1.337\n";
-               } else {
-                       Assert.fail("Unknown field");
-               }
-
-               after();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 5fa01b7..4a583e1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -21,10 +21,8 @@ package org.apache.flink.api.common;
 import com.esotericsoftware.kryo.Serializer;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A configuration config for configuring behavior of the system, such as 
whether to use
@@ -44,12 +42,17 @@ public class ExecutionConfig implements Serializable {
 
        private boolean objectReuse = false;
 
+       private boolean disableAutoTypeRegistration = false;
+
 
        // Serializers and types registered with Kryo and the PojoSerializer
-       private final Map<Class<?>, Serializer<?>> registeredKryoSerializers = 
new HashMap<Class<?>, Serializer<?>>();
-       private final Map<Class<?>, Class<? extends Serializer<?>>> 
registeredKryoSerializersClasses = new HashMap<Class<?>, Class<? extends 
Serializer<?>>>();
-       private final Set<Class<?>> registeredKryoTypes = new 
HashSet<Class<?>>();
-       private final Set<Class<?>> registeredPojoTypes = new 
HashSet<Class<?>>();
+       // we store them in lists to ensure they are registered in order in all 
kryo instances.
+       private final List<Entry<Class<?>, Serializer<?>>> 
registeredTypesWithKryoSerializers = new ArrayList<Entry<Class<?>, 
Serializer<?>>>();
+       private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> 
registeredTypesWithKryoSerializerClasses = new ArrayList<Entry<Class<?>, 
Class<? extends Serializer<?>>>>();
+       private final List<Entry<Class<?>, Serializer<?>>> 
defaultKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>();
+       private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> 
defaultKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends 
Serializer<?>>>>();
+       private final List<Class<?>> registeredKryoTypes = new 
ArrayList<Class<?>>();
+       private final List<Class<?>> registeredPojoTypes = new 
ArrayList<Class<?>>();
 
        /**
         * Enables the ClosureCleaner. This analyzes user code functions and 
sets fields to null
@@ -191,9 +194,10 @@ public class ExecutionConfig implements Serializable {
        //  Registry for types and serializers
        // 
--------------------------------------------------------------------------------------------
 
+
+
        /**
-        * Registers the given Serializer as a default serializer for the given 
type at the
-        * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}.
+        * Adds a new Kryo default serializer to the Runtime.
         *
         * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
         * because it may be distributed to the worker nodes by java 
serialization.
@@ -201,7 +205,7 @@ public class ExecutionConfig implements Serializable {
         * @param type The class of the types serialized with the given 
serializer.
         * @param serializer The serializer to use.
         */
-       public void registerKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
+       public void addDefaultKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
                if (type == null || serializer == null) {
                        throw new NullPointerException("Cannot register null 
class or serializer.");
                }
@@ -210,23 +214,66 @@ public class ExecutionConfig implements Serializable {
                                        + "as defined by java.io.Serializable. 
For stateless serializers, you can use the "
                                        + "'registerSerializer(Class, Class)' 
method to register the serializer via its class.");
                }
-
-               registeredKryoSerializers.put(type, serializer);
+               Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, 
Serializer<?>>(type, serializer);
+               if(!defaultKryoSerializers.contains(e)) {
+                       defaultKryoSerializers.add(e);
+               }
        }
 
        /**
-        * Registers the given Serializer via its class as a serializer for the 
given type at the
-        * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}.
+        * Adds a new Kryo default serializer to the Runtime.
         *
         * @param type The class of the types serialized with the given 
serializer.
         * @param serializerClass The class of the serializer to use.
         */
-       public void registerKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
+       public void addDefaultKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
                if (type == null || serializerClass == null) {
                        throw new NullPointerException("Cannot register null 
class or serializer.");
                }
+               Entry<Class<?>, Class<? extends Serializer<?>>> e = new 
Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass);
+               if(!defaultKryoSerializerClasses.contains(e)) {
+                       defaultKryoSerializerClasses.add(e);
+               }
+       }
+
+       /**
+        * Registers the given type with a Kryo Serializer.
+        *
+        * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
+        * because it may be distributed to the worker nodes by java 
serialization.
+        *
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializer The serializer to use.
+        */
+       public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
+               if (type == null || serializer == null) {
+                       throw new NullPointerException("Cannot register null 
class or serializer.");
+               }
+               if (!(serializer instanceof java.io.Serializable)) {
+                       throw new IllegalArgumentException("The serializer 
instance must be serializable, (for distributing it in the cluster), "
+                                       + "as defined by java.io.Serializable. 
For stateless serializers, you can use the "
+                                       + "'registerSerializer(Class, Class)' 
method to register the serializer via its class.");
+               }
+               Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, 
Serializer<?>>(type, serializer);
+               if(!registeredTypesWithKryoSerializers.contains(e)) {
+                       registeredTypesWithKryoSerializers.add(e);
+               }
+       }
 
-               registeredKryoSerializersClasses.put(type, serializerClass);
+       /**
+        * Registers the given Serializer via its class as a serializer for the 
given type at the KryoSerializer
+        *
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializerClass The class of the serializer to use.
+        */
+       public void registerTypeWithKryoSerializer(Class<?> type, Class<? 
extends Serializer<?>> serializerClass) {
+               if (type == null || serializerClass == null) {
+                       throw new NullPointerException("Cannot register null 
class or serializer.");
+               }
+               Entry<Class<?>, Class<? extends Serializer<?>>> e = new 
Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass);
+               if(!registeredTypesWithKryoSerializerClasses.contains(e)) {
+                       registeredTypesWithKryoSerializerClasses.add(e);
+               }
        }
 
        /**
@@ -241,7 +288,9 @@ public class ExecutionConfig implements Serializable {
                if (type == null) {
                        throw new NullPointerException("Cannot register null 
type class.");
                }
-               registeredPojoTypes.add(type);
+               if(!registeredPojoTypes.contains(type)) {
+                       registeredPojoTypes.add(type);
+               }
        }
 
        /**
@@ -260,29 +309,48 @@ public class ExecutionConfig implements Serializable {
        }
 
        /**
-        * Returns the registered Kryo Serializers.
+        * Returns the registered types with Kryo Serializers.
         */
-       public Map<Class<?>, Serializer<?>> getRegisteredKryoSerializers() {
-               return registeredKryoSerializers;
+       public List<Entry<Class<?>, Serializer<?>>> 
getRegisteredTypesWithKryoSerializers() {
+               return registeredTypesWithKryoSerializers;
        }
 
        /**
-        * Returns the registered Kryo Serializer classes.
+        * Returns the registered types with their Kryo Serializer classes.
         */
-       public Map<Class<?>, Class<? extends Serializer<?>>> 
getRegisteredKryoSerializersClasses() {
-               return registeredKryoSerializersClasses;
+       public List<Entry<Class<?>, Class<? extends Serializer<?>>>> 
getRegisteredTypesWithKryoSerializerClasses() {
+               return registeredTypesWithKryoSerializerClasses;
+       }
+
+
+       /**
+        * Returns the registered default Kryo Serializers.
+        */
+       public List<Entry<Class<?>, Serializer<?>>> getDefaultKryoSerializers() 
{
+               return defaultKryoSerializers;
+       }
+
+       /**
+        * Returns the registered default Kryo Serializer classes.
+        */
+       public List<Entry<Class<?>, Class<? extends Serializer<?>>>> 
getDefaultKryoSerializerClasses() {
+               return defaultKryoSerializerClasses;
        }
 
        /**
         * Returns the registered Kryo types.
         */
-       public Set<Class<?>> getRegisteredKryoTypes() {
+       public List<Class<?>> getRegisteredKryoTypes() {
                if (isForceKryoEnabled()) {
                        // if we force kryo, we must also return all the types 
that
                        // were previously only registered as POJO
-                       Set<Class<?>> result = new HashSet<Class<?>>();
+                       List<Class<?>> result = new ArrayList<Class<?>>();
                        result.addAll(registeredKryoTypes);
-                       result.addAll(registeredPojoTypes);
+                       for(Class<?> t : registeredPojoTypes) {
+                               if (!result.contains(t)) {
+                                       result.add(t);
+                               }
+                       }
                        return result;
                } else {
                        return registeredKryoTypes;
@@ -292,7 +360,76 @@ public class ExecutionConfig implements Serializable {
        /**
         * Returns the registered POJO types.
         */
-       public Set<Class<?>> getRegisteredPojoTypes() {
+       public List<Class<?>> getRegisteredPojoTypes() {
                return registeredPojoTypes;
        }
+
+
+       public boolean isDisableAutoTypeRegistration() {
+               return disableAutoTypeRegistration;
+       }
+
+       /**
+        * Control whether Flink is automatically registering all types in the 
user programs with
+        * Kryo.
+        *
+        * @param disableAutoTypeRegistration
+        */
+       public void setDisableAutoTypeRegistration(boolean 
disableAutoTypeRegistration) {
+               this.disableAutoTypeRegistration = disableAutoTypeRegistration;
+       }
+
+
+       public static class Entry<K, V> implements Serializable {
+               private final K k;
+               private final V v;
+               public Entry(K k, V v) {
+                       this.k = k;
+                       this.v = v;
+               }
+
+               public K getKey() {
+                       return k;
+               }
+
+               public V getValue() {
+                       return v;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       Entry entry = (Entry) o;
+
+                       if (k != null ? !k.equals(entry.k) : entry.k != null) {
+                               return false;
+                       }
+                       if (v != null ? !v.equals(entry.v) : entry.v != null) {
+                               return false;
+                       }
+
+                       return true;
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = k != null ? k.hashCode() : 0;
+                       result = 31 * result + (v != null ? v.hashCode() : 0);
+                       return result;
+               }
+
+               @Override
+               public String toString() {
+                       return "Entry{" +
+                                       "k=" + k +
+                                       ", v=" + v +
+                                       '}';
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 17b8dab..3bd1fff 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -132,9 +132,8 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
                }
                return getNewComparator(config);
        }
-       
 
-       
+
        public static class FlatFieldDescriptor {
                private int keyPosition;
                private TypeInformation<?> type;

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 329e826..542b059 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -42,7 +42,7 @@ public abstract class TypeSerializer<T> implements 
Serializable {
        // 
--------------------------------------------------------------------------------------------
        // General information about the type and the serializer
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * Gets whether the type is an immutable type.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE 
b/flink-dist/src/main/flink-bin/LICENSE
index 25f91d4..89d8eca 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -233,7 +233,9 @@ under the Apache License (v 2.0):
  - Apache Kafka (http://kafka.apache.org)
  - Apache Flume (http://flume.apache.org)
  - Apache Sling (http://sling.apache.org)
+ - Apache Thrift (http://thrift.apache.org)
  - Google Guava (https://code.google.com/p/guava-libraries/)
+ - Google Protocol Buffers (https://github.com/google/protobuf/)
  - Netty v4.0.21 (http://netty.io)
  - Powermock (http://www.powermock.org)
  - Javassist (http://www.javassist.org)

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-examples/flink-java-examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/resources/log4j.properties 
b/flink-examples/flink-java-examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..da32ea0
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index dc5076a..640c527 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -33,6 +33,9 @@ under the License.
        <name>flink-java</name>
 
        <packaging>jar</packaging>
+       <properties>
+               <chill.version>0.5.2</chill.version>
+       </properties>
 
        <dependencies>
                <dependency>
@@ -60,19 +63,19 @@ under the License.
                <dependency>
                        <groupId>com.twitter</groupId>
                        <artifactId>chill_2.10</artifactId>
-                       <version>0.5.1</version>
+                       <version>${chill.version}</version>
                </dependency>
 
                <dependency>
                        <groupId>com.twitter</groupId>
                        <artifactId>chill-avro_2.10</artifactId>
-                       <version>0.5.1</version>
+                       <version>${chill.version}</version>
                </dependency>
 
                <dependency>
                        <groupId>com.twitter</groupId>
                        <artifactId>chill-protobuf</artifactId>
-                       <version>0.5.1</version>
+                       <version>${chill.version}</version>
                </dependency>
                <!-- We need protobuf for chill-protobuf -->
                <dependency>
@@ -84,7 +87,7 @@ under the License.
                <dependency>
                        <groupId>com.twitter</groupId>
                        <artifactId>chill-thrift</artifactId>
-                       <version>0.5.1</version>
+                       <version>${chill.version}</version>
                </dependency>
                <!-- libthrift is required by chill-thrift -->
                <dependency>
@@ -112,7 +115,6 @@ under the License.
                <dependency>
                        <groupId>joda-time</groupId>
                        <artifactId>joda-time</artifactId>
-                       <version>2.7</version>
                </dependency>
 
                <!--  guava needs to be in "provided" scope, to make sure it is 
not included into the jars by the shading -->
@@ -130,13 +132,6 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
-               
-               <dependency>
-                       <groupId>joda-time</groupId>
-                       <artifactId>joda-time</artifactId>
-                       <scope>test</scope>
-               </dependency>
-               
        </dependencies>
 
        <!-- Because flink-scala and flink-avro uses it in tests -->

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1105ab9..21e1f24 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 
 import com.esotericsoftware.kryo.Serializer;
 
+import com.google.common.base.Joiner;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -38,8 +39,10 @@ import org.apache.flink.api.common.Plan;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
@@ -54,16 +57,21 @@ import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.operators.OperatorTranslation;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
 import org.apache.flink.util.SplittableIterator;
+import org.apache.flink.util.Visitor;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The ExecutionEnviroment is the context in which a program is executed. A
@@ -84,6 +92,8 @@ import org.apache.hadoop.mapreduce.Job;
  * @see RemoteEnvironment
  */
 public abstract class ExecutionEnvironment {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExecutionEnvironment.class);
        
        /** The environment of the context (local by default, cluster if 
invoked through command line) */
        private static ExecutionEnvironmentFactory contextEnvironmentFactory;
@@ -200,29 +210,51 @@ public abstract class ExecutionEnvironment {
        //  Registry for types and serializers
        // 
--------------------------------------------------------------------------------------------
 
+
        /**
-        * Registers the given Serializer as a default serializer for the given 
type at the
-        * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
-        * 
+        * Adds a new Kryo default serializer to the Runtime.
+        *
         * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
         * because it may be distributed to the worker nodes by java 
serialization.
-        * 
+        *
         * @param type The class of the types serialized with the given 
serializer.
         * @param serializer The serializer to use.
         */
-       public void registerKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
-               config.registerKryoSerializer(type, serializer);
+       public void addDefaultKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
+               config.addDefaultKryoSerializer(type, serializer);
        }
 
        /**
-        * Registers the given Serializer via its class as a serializer for the 
given type at the
-        * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
-        * 
+        * Adds a new Kryo default serializer to the Runtime.
+        *
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializerClass The class of the serializer to use.
+        */
+       public void addDefaultKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
+               config.addDefaultKryoSerializer(type, serializerClass);
+       }
+
+       /**
+        * Registers the given type with a Kryo Serializer.
+        *
+        * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
+        * because it may be distributed to the worker nodes by java 
serialization.
+        *
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializer The serializer to use.
+        */
+       public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
+               config.registerTypeWithKryoSerializer(type, serializer);
+       }
+
+       /**
+        * Registers the given Serializer via its class as a serializer for the 
given type at the KryoSerializer
+        *
         * @param type The class of the types serialized with the given 
serializer.
         * @param serializerClass The class of the serializer to use.
         */
-       public void registerKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
-               config.registerKryoSerializer(type, serializerClass);
+       public void registerTypeWithKryoSerializer(Class<?> type, Class<? 
extends Serializer<?>> serializerClass) {
+               config.registerTypeWithKryoSerializer(type, serializerClass);
        }
        
        /**
@@ -743,7 +775,7 @@ public abstract class ExecutionEnvironment {
         * @throws Exception Thrown, if the program executions fails.
         */
        public abstract JobExecutionResult execute(String jobName) throws 
Exception;
-       
+
        /**
         * Creates the plan with which the system will execute the program, and 
returns it as 
         * a String using a JSON representation of the execution data flow 
graph.
@@ -856,11 +888,35 @@ public abstract class ExecutionEnvironment {
                
                OperatorTranslation translator = new OperatorTranslation();
                JavaPlan plan = translator.translateToPlan(this.sinks, jobName);
-               
+
                if (getDegreeOfParallelism() > 0) {
                        plan.setDefaultParallelism(getDegreeOfParallelism());
                }
                plan.setExecutionConfig(getConfig());
+               // Check plan for GenericTypeInfo's and register the types at 
the serializers.
+               plan.accept(new 
Visitor<org.apache.flink.api.common.operators.Operator<?>>() {
+                       @Override
+                       public boolean 
preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
+                               OperatorInformation<?> opInfo = 
visitable.getOperatorInfo();
+                               TypeInformation<?> typeInfo = 
opInfo.getOutputType();
+                               if(typeInfo instanceof GenericTypeInfo) {
+                                       GenericTypeInfo<?> genericTypeInfo = 
(GenericTypeInfo<?>) typeInfo;
+                                       
if(!config.isDisableAutoTypeRegistration()) {
+                                               
Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config);
+                                       }
+                               }
+                               if(typeInfo instanceof CompositeType) {
+                                       List<GenericTypeInfo<?>> 
genericTypesInComposite = new ArrayList<GenericTypeInfo<?>>();
+                                       
Utils.getContainedGenericTypes((CompositeType)typeInfo, 
genericTypesInComposite);
+                                       for(GenericTypeInfo<?> gt : 
genericTypesInComposite) {
+                                               
Serializers.recursivelyRegisterType(gt.getTypeClass(), config);
+                                       }
+                               }
+                               return true;
+                       }
+                       @Override
+                       public void 
postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
+               });
 
                try {
                        registerCachedFilesWithPlan(plan);
@@ -872,7 +928,25 @@ public abstract class ExecutionEnvironment {
                if (clearSinks) {
                        this.sinks.clear();
                }
-               
+
+               // All types are registered now. Print information.
+               int registeredTypes = config.getRegisteredKryoTypes().size() +
+                               config.getRegisteredPojoTypes().size() +
+                               
config.getRegisteredTypesWithKryoSerializerClasses().size() +
+                               
config.getRegisteredTypesWithKryoSerializers().size();
+               int defaultKryoSerializers = 
config.getDefaultKryoSerializers().size() +
+                               config.getDefaultKryoSerializerClasses().size();
+               LOG.info("The job has {} registered types and {} default Kryo 
serializers", registeredTypes, defaultKryoSerializers);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("Registered Kryo types: {}", 
Joiner.on(',').join(config.getRegisteredKryoTypes()));
+                       LOG.debug("Registered Kryo with Serializers types: {}", 
Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializers()));
+                       LOG.debug("Registered Kryo with Serializer Classes 
types: {}", 
Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializerClasses()));
+                       LOG.debug("Registered Kryo default Serializers: {}", 
Joiner.on(',').join(config.getDefaultKryoSerializers()));
+                       LOG.debug("Registered Kryo default Serializers Classes 
{}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses()));
+                       LOG.debug("Registered POJO types: {}", 
Joiner.on(',').join(config.getRegisteredPojoTypes()));
+               }
+
                return plan;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index ba39aec..21df168 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.api.java;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+
+import java.util.List;
+
 public class Utils {
 
        public static String getCallLocationName() {
@@ -35,4 +41,23 @@ public class Utils {
 
                return String.format("%s(%s:%d)", elem.getMethodName(), 
elem.getFileName(), elem.getLineNumber());
        }
+
+       /**
+        * Returns all GenericTypeInfos contained in a composite type.
+        *
+        * @param typeInfo
+        * @return
+        */
+       public static void getContainedGenericTypes(CompositeType typeInfo, 
List<GenericTypeInfo<?>> target) {
+               for(int i = 0; i < typeInfo.getArity(); i++) {
+                       TypeInformation<?> type = typeInfo.getTypeAt(i);
+                       if(type instanceof CompositeType) {
+                               getContainedGenericTypes((CompositeType) type, 
target);
+                       } else if(type instanceof GenericTypeInfo) {
+                               if(!target.contains(type)) {
+                                       target.add((GenericTypeInfo<?>) type);
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
index ccdf7f7..bf580d1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -27,10 +27,16 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Special type information to generate a POJO type info from an avro schema.
+ * Special type information to generate a special AvroTypeInfo for Avro POJOs 
(implementing SpecificRecordBase, the typed Avro POJOs)
  *
  * Proceeding: It uses a regular pojo type analysis and replaces all 
GenericType<CharSequence>
- *     with a GenericType<avro.Utf8>
+ *     with a GenericType<avro.Utf8>.
+ * All other types used by Avro are standard Java types.
+ * Only strings are represented as CharSequence fields and represented as Utf8 
classes at runtime.
+ * CharSequence is not comparable. To make them nicely usable with field 
expressions, we replace them here
+ * by generic type infos containing Utf8 classes (which are comparable),
+ *
+ * This class is checked by the AvroPojoTest.
  * @param <T>
  */
 public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
@@ -40,7 +46,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> 
extends PojoTypeInfo<T>
 
        private static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
                PojoTypeExtractor pte = new PojoTypeExtractor();
-               TypeInformation ti = pte.analyzePojo(typeClass, new 
ArrayList<Type>(), null);
+               TypeInformation ti = pte.analyzePojo(typeClass, new 
ArrayList<Type>(), null, null, null);
 
                if(!(ti instanceof PojoTypeInfo)) {
                        throw new IllegalStateException("Expecting type to be a 
PojoTypeInfo");

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index c629ba3..34226ce 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
 public class GenericTypeInfo<T> extends TypeInformation<T> implements 
AtomicType<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 73cedd1..5f52f98 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1259,7 +1259,7 @@ public class TypeExtractor {
        }
 
        @SuppressWarnings("unchecked")
-       private <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> 
clazz, ArrayList<Type> typeHierarchy,
+       protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> 
clazz, ArrayList<Type> typeHierarchy,
                        ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 
                // add the hierarchy of the POJO itself if it is generic
@@ -1385,8 +1385,9 @@ public class TypeExtractor {
                }
                return result;
        }
-       
-       private static Class<?> typeToClass(Type t) {
+
+       // not public to users
+       public static Class<?> typeToClass(Type t) {
                if (t instanceof Class) {
                        return (Class<?>)t;
                }
@@ -1395,8 +1396,9 @@ public class TypeExtractor {
                }
                throw new IllegalArgumentException("Cannot convert type to 
class");
        }
-       
-       private static boolean isClassType(Type t) {
+
+       // not public to users
+       public static boolean isClassType(Type t) {
                return t instanceof Class<?> || t instanceof ParameterizedType;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
deleted file mode 100644
index d8411a0..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.twitter.chill.ScalaKryoInstantiator;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.lang.reflect.Modifier;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A type serializer that serializes its type using the Kryo serialization
- * framework (https://github.com/EsotericSoftware/kryo).
- * 
- * This serializer is intended as a fallback serializer for the cases that are
- * not covered by the basic types, tuples, and POJOs.
- *
- * @param <T> The type to be serialized.
- */
-public class KryoSerializer<T> extends TypeSerializer<T> {
-       
-       private static final long serialVersionUID = 3L;
-
-       // 
------------------------------------------------------------------------
-
-       private final Map<Class<?>, Serializer<?>> registeredSerializers;
-       private final Map<Class<?>, Class<? extends Serializer<?>>> 
registeredSerializersClasses;
-       private final Set<Class<?>> registeredTypes;
-
-       private final Class<T> type;
-       
-       // 
------------------------------------------------------------------------
-       // The fields below are lazily initialized after duplication or 
deserialization.
-
-       private transient Kryo kryo;
-       private transient T copyInstance;
-       
-       private transient DataOutputView previousOut;
-       private transient DataInputView previousIn;
-       
-       private transient Input input;
-       private transient Output output;
-       
-       // 
------------------------------------------------------------------------
-
-       public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
-               if(type == null){
-                       throw new NullPointerException("Type class cannot be 
null.");
-               }
-               this.type = type;
-
-               this.registeredSerializers = 
executionConfig.getRegisteredKryoSerializers();
-               this.registeredSerializersClasses = 
executionConfig.getRegisteredKryoSerializersClasses();
-               this.registeredTypes = executionConfig.getRegisteredKryoTypes();
-       }
-
-       /**
-        * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
-        */
-       protected KryoSerializer(KryoSerializer<T> toCopy) {
-               registeredSerializers = toCopy.registeredSerializers;
-               registeredSerializersClasses = 
toCopy.registeredSerializersClasses;
-               registeredTypes = toCopy.registeredTypes;
-
-               type = toCopy.type;
-               if(type == null){
-                       throw new NullPointerException("Type class cannot be 
null.");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public KryoSerializer<T> duplicate() {
-               return new KryoSerializer<T>(this);
-       }
-
-       @Override
-       public T createInstance() {
-               if(Modifier.isAbstract(type.getModifiers()) || 
Modifier.isInterface(type.getModifiers()) ) {
-                       return null;
-               } else {
-                       checkKryoInitialized();
-                       try {
-                               return kryo.newInstance(type);
-                       } catch(Throwable e) {
-                               return null;
-                       }
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public T copy(T from) {
-               if (from == null) {
-                       return null;
-               }
-               checkKryoInitialized();
-               try {
-                       return kryo.copy(from);
-               }
-               catch(KryoException ke) {
-                       // kryo was unable to copy it, so we do it through 
serialization:
-                       ByteArrayOutputStream baout = new 
ByteArrayOutputStream();
-                       Output output = new Output(baout);
-
-                       kryo.writeObject(output, from);
-
-                       output.close();
-
-                       ByteArrayInputStream bain = new 
ByteArrayInputStream(baout.toByteArray());
-                       Input input = new Input(bain);
-
-                       return (T)kryo.readObject(input, from.getClass());
-               }
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               return copy(from);
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(T record, DataOutputView target) throws 
IOException {
-               checkKryoInitialized();
-               if (target != previousOut) {
-                       DataOutputViewStream outputStream = new 
DataOutputViewStream(target);
-                       output = new Output(outputStream);
-                       previousOut = target;
-               }
-
-               try {
-                       kryo.writeClassAndObject(output, record);
-                       output.flush();
-               }
-               catch (KryoException ke) {
-                       Throwable cause = ke.getCause();
-                       if (cause instanceof EOFException) {
-                               throw (EOFException) cause;
-                       }
-                       else {
-                               throw ke;
-                       }
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               checkKryoInitialized();
-               if (source != previousIn) {
-                       DataInputViewStream inputStream = new 
DataInputViewStream(source);
-                       input = new NoFetchingInput(inputStream);
-                       previousIn = source;
-               }
-
-               try {
-                       return (T) kryo.readClassAndObject(input);
-               } catch (KryoException ke) {
-                       Throwable cause = ke.getCause();
-
-                       if(cause instanceof EOFException) {
-                               throw (EOFException) cause;
-                       } else {
-                               throw ke;
-                       }
-               }
-       }
-       
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               return deserialize(source);
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               checkKryoInitialized();
-               if(this.copyInstance == null){
-                       this.copyInstance = createInstance();
-               }
-
-               T tmp = deserialize(copyInstance, source);
-               serialize(tmp, target);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return type.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj != null && obj instanceof KryoSerializer) {
-                       KryoSerializer<?> other = (KryoSerializer<?>) obj;
-                       return other.type == this.type;
-               } else {
-                       return false;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new ScalaKryoInstantiator().newKryo();
-
-                       // Throwable and all subclasses should be serialized 
via java serialization
-                       kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());
-
-                       // register the type of our class
-                       kryo.register(type);
-                       
-                       // register given types. we do this first so that any 
registration of a
-                       // more specific serializer overrides this
-                       for (Class<?> type : registeredTypes) {
-                               kryo.register(type);
-                       }
-                       
-                       // register given serializer classes
-                       for (Map.Entry<Class<?>, Class<? extends 
Serializer<?>>> e : registeredSerializersClasses.entrySet()) {
-                               Class<?> typeClass = e.getKey();
-                               Class<? extends Serializer<?>> serializerClass 
= e.getValue();
-                               
-                               Serializer<?> serializer = 
-                                               
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
-                               kryo.register(typeClass, serializer);
-                       }
-
-                       // register given serializers
-                       for (Map.Entry<Class<?>, Serializer<?>> e : 
registeredSerializers.entrySet()) {
-                               kryo.register(e.getKey(), e.getValue());
-                       }
-                       
-                       kryo.setRegistrationRequired(false);
-                       
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // For testing
-       // 
--------------------------------------------------------------------------------------------
-       
-       Kryo getKryo() {
-               checkKryoInitialized();
-               return this.kryo;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index f7742ca..8b95296 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -79,7 +78,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                this.numFields = fieldSerializers.length;
                this.executionConfig = executionConfig;
 
-               Set<Class<?>> registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
+               List<Class<?>> registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
 
                for (int i = 0; i < numFields; i++) {
                        this.fields[i].setAccessible(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
new file mode 100644
index 0000000..e14546e
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.twitter.chill.ScalaKryoInstantiator;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
+import 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.lang.reflect.Modifier;
+import java.util.List;
+
+/**
+ * A type serializer that serializes its type using the Kryo serialization
+ * framework (https://github.com/EsotericSoftware/kryo).
+ * 
+ * This serializer is intended as a fallback serializer for the cases that are
+ * not covered by the basic types, tuples, and POJOs.
+ *
+ * @param <T> The type to be serialized.
+ */
+public class KryoSerializer<T> extends TypeSerializer<T> {
+       
+       private static final long serialVersionUID = 3L;
+
+       // 
------------------------------------------------------------------------
+
+       private final List<ExecutionConfig.Entry<Class<?>, Serializer<?>>> 
registeredTypesWithSerializers;
+       private final List<ExecutionConfig.Entry<Class<?>, Class<? extends 
Serializer<?>>>> registeredTypesWithSerializerClasses;
+       private final List<ExecutionConfig.Entry<Class<?>, Serializer<?>>> 
defaultSerializers;
+       private final List<ExecutionConfig.Entry<Class<?>, Class<? extends 
Serializer<?>>>> defaultSerializerClasses;
+       private final List<Class<?>> registeredTypes;
+
+       private final Class<T> type;
+       
+       // 
------------------------------------------------------------------------
+       // The fields below are lazily initialized after duplication or 
deserialization.
+
+       private transient Kryo kryo;
+       private transient T copyInstance;
+       
+       private transient DataOutputView previousOut;
+       private transient DataInputView previousIn;
+       
+       private transient Input input;
+       private transient Output output;
+       
+       // 
------------------------------------------------------------------------
+
+       public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
+               if(type == null){
+                       throw new NullPointerException("Type class cannot be 
null.");
+               }
+               this.type = type;
+
+               this.defaultSerializers = 
executionConfig.getDefaultKryoSerializers();
+               this.defaultSerializerClasses = 
executionConfig.getDefaultKryoSerializerClasses();
+               this.registeredTypesWithSerializers = 
executionConfig.getRegisteredTypesWithKryoSerializers();
+               this.registeredTypesWithSerializerClasses = 
executionConfig.getRegisteredTypesWithKryoSerializerClasses();
+               this.registeredTypes = executionConfig.getRegisteredKryoTypes();
+       }
+
+       /**
+        * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
+        */
+       protected KryoSerializer(KryoSerializer<T> toCopy) {
+               registeredTypesWithSerializers = 
toCopy.registeredTypesWithSerializers;
+               registeredTypesWithSerializerClasses = 
toCopy.registeredTypesWithSerializerClasses;
+               defaultSerializers = toCopy.defaultSerializers;
+               defaultSerializerClasses = toCopy.defaultSerializerClasses;
+               registeredTypes = toCopy.registeredTypes;
+
+               type = toCopy.type;
+               if(type == null){
+                       throw new NullPointerException("Type class cannot be 
null.");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public KryoSerializer<T> duplicate() {
+               return new KryoSerializer<T>(this);
+       }
+
+       @Override
+       public T createInstance() {
+               if(Modifier.isAbstract(type.getModifiers()) || 
Modifier.isInterface(type.getModifiers()) ) {
+                       return null;
+               } else {
+                       checkKryoInitialized();
+                       try {
+                               return kryo.newInstance(type);
+                       } catch(Throwable e) {
+                               return null;
+                       }
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public T copy(T from) {
+               if (from == null) {
+                       return null;
+               }
+               checkKryoInitialized();
+               try {
+                       return kryo.copy(from);
+               }
+               catch(KryoException ke) {
+                       // kryo was unable to copy it, so we do it through 
serialization:
+                       ByteArrayOutputStream baout = new 
ByteArrayOutputStream();
+                       Output output = new Output(baout);
+
+                       kryo.writeObject(output, from);
+
+                       output.close();
+
+                       ByteArrayInputStream bain = new 
ByteArrayInputStream(baout.toByteArray());
+                       Input input = new Input(bain);
+
+                       return (T)kryo.readObject(input, from.getClass());
+               }
+       }
+       
+       @Override
+       public T copy(T from, T reuse) {
+               return copy(from);
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               checkKryoInitialized();
+               if (target != previousOut) {
+                       DataOutputViewStream outputStream = new 
DataOutputViewStream(target);
+                       output = new Output(outputStream);
+                       previousOut = target;
+               }
+
+               try {
+                       kryo.writeClassAndObject(output, record);
+                       output.flush();
+               }
+               catch (KryoException ke) {
+                       Throwable cause = ke.getCause();
+                       if (cause instanceof EOFException) {
+                               throw (EOFException) cause;
+                       }
+                       else {
+                               throw ke;
+                       }
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               checkKryoInitialized();
+               if (source != previousIn) {
+                       DataInputViewStream inputStream = new 
DataInputViewStream(source);
+                       input = new NoFetchingInput(inputStream);
+                       previousIn = source;
+               }
+
+               try {
+                       return (T) kryo.readClassAndObject(input);
+               } catch (KryoException ke) {
+                       Throwable cause = ke.getCause();
+
+                       if(cause instanceof EOFException) {
+                               throw (EOFException) cause;
+                       } else {
+                               throw ke;
+                       }
+               }
+       }
+       
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               checkKryoInitialized();
+               if(this.copyInstance == null){
+                       this.copyInstance = createInstance();
+               }
+
+               T tmp = deserialize(copyInstance, source);
+               serialize(tmp, target);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public int hashCode() {
+               return type.hashCode();
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj != null && obj instanceof KryoSerializer) {
+                       KryoSerializer<?> other = (KryoSerializer<?>) obj;
+                       return other.type == this.type;
+               } else {
+                       return false;
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       private void checkKryoInitialized() {
+               if (this.kryo == null) {
+                       this.kryo = new ScalaKryoInstantiator().newKryo();
+
+                       // Throwable and all subclasses should be serialized 
via java serialization
+                       kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());
+
+                       // Add default serializers first, so that they type 
registrations without a serializer
+                       // are registered with a default serializer
+                       for(ExecutionConfig.Entry<Class<?>, Serializer<?>> 
serializer : defaultSerializers) {
+                               kryo.addDefaultSerializer(serializer.getKey(), 
serializer.getValue());
+                       }
+                       for(ExecutionConfig.Entry<Class<?>, Class<? extends 
Serializer<?>>> serializer : defaultSerializerClasses) {
+                               kryo.addDefaultSerializer(serializer.getKey(), 
serializer.getValue());
+                       }
+
+                       // register the type of our class
+                       kryo.register(type);
+
+                       // register given types. we do this first so that any 
registration of a
+                       // more specific serializer overrides this
+                       for (Class<?> type : registeredTypes) {
+                               kryo.register(type);
+                       }
+
+                       // register given serializer classes
+                       for (ExecutionConfig.Entry<Class<?>, Class<? extends 
Serializer<?>>> e : registeredTypesWithSerializerClasses) {
+                               Class<?> typeClass = e.getKey();
+                               Class<? extends Serializer<?>> serializerClass 
= e.getValue();
+
+                               Serializer<?> serializer =
+                                               
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
+                               kryo.register(typeClass, serializer);
+                       }
+
+                       // register given serializers
+                       for (ExecutionConfig.Entry<Class<?>, Serializer<?>> e : 
registeredTypesWithSerializers) {
+                               kryo.register(e.getKey(), e.getValue());
+                       }
+                       // this is needed for Avro but can not be added on 
demand.
+                       kryo.register(GenericData.Array.class, new 
SpecificInstanceCollectionSerializerForArrayList());
+
+                       kryo.setRegistrationRequired(false);
+                       
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // For testing
+       // 
--------------------------------------------------------------------------------------------
+       
+       Kryo getKryo() {
+               checkKryoInitialized();
+               return this.kryo;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
new file mode 100644
index 0000000..3e355aa
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.protobuf.Message;
+import com.twitter.chill.protobuf.ProtobufSerializer;
+import com.twitter.chill.thrift.TBaseSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.thrift.protocol.TMessage;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import scala.reflect.ClassTag;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Class containing utilities for the serializers of the Flink Runtime.
+ *
+ * Most of the serializers are automatically added to the system.
+ *
+ * Note that users can also implement the {@link 
com.esotericsoftware.kryo.KryoSerializable} interface
+ * to provide custom serialization for their classes.
+ * Also, there is a Java Annotation for adding a default serializer 
(@DefaultSerializer) to classes.
+ */
+public class Serializers {
+       /**
+        * NOTE: This method is not a public Flink API.
+        *
+        * This method walks the entire hierarchy of the given type and 
registers all types it encounters
+        * to Kryo.
+        * It also watches for types which need special serializers.
+        */
+       private static Set<Class<?>> alreadySeen = new HashSet<Class<?>>();
+
+       public static void recursivelyRegisterType(Class<?> type, 
ExecutionConfig config) {
+               alreadySeen.add(type);
+               if(type.isPrimitive()) {
+                       return;
+               }
+               config.registerKryoType(type);
+               addSerializerForType(config, type);
+
+               Field[] fields = type.getDeclaredFields();
+               for(Field field : fields) {
+                       Type fieldType = field.getGenericType();
+                       if(fieldType instanceof ParameterizedType) { // field 
has generics
+                               ParameterizedType parameterizedFieldType = 
(ParameterizedType) fieldType;
+                               for(Type t: 
parameterizedFieldType.getActualTypeArguments()) {
+                                       if(TypeExtractor.isClassType(t) ) {
+                                               Class clazz = 
TypeExtractor.typeToClass(t);
+                                               
if(!alreadySeen.contains(clazz)) {
+                                                       
recursivelyRegisterType(TypeExtractor.typeToClass(t), config);
+                                               }
+                                       }
+                               }
+                       }
+                       Class<?> clazz = field.getType();
+                       if(!alreadySeen.contains(clazz)) {
+                               recursivelyRegisterType(clazz, config);
+                       }
+               }
+       }
+
+       public static void addSerializerForType(ExecutionConfig reg, Class<?> 
type) {
+               if(Message.class.isAssignableFrom(type)) {
+                       registerProtoBuf(reg);
+               }
+               if(TMessage.class.isAssignableFrom(type)) {
+                       registerThrift(reg);
+               }
+               if(GenericData.Record.class.isAssignableFrom(type)) {
+                       registerGenericAvro(reg);
+               }
+               if(SpecificRecordBase.class.isAssignableFrom(type)) {
+                       registerSpecificAvro(reg, (Class<? extends 
SpecificRecordBase>) type);
+               }
+               if(DateTime.class.isAssignableFrom(type) || 
Interval.class.isAssignableFrom(type)) {
+                       registerJodaTime(reg);
+               }
+       }
+
+       /**
+        * Register serializers required for Google Protocol Buffers
+        * with Flink runtime.
+        */
+       public static void registerProtoBuf(ExecutionConfig reg) {
+               // Google Protobuf (FLINK-1392)
+               reg.registerTypeWithKryoSerializer(Message.class, 
ProtobufSerializer.class);
+       }
+
+       /**
+        * Register Apache Thrift messages
+        */
+       public static void registerThrift(ExecutionConfig reg) {
+               // TBaseSerializer states it should be initalized as a default 
Kryo serializer
+               reg.addDefaultKryoSerializer(TMessage.class, 
TBaseSerializer.class);
+               reg.registerKryoType(TMessage.class);
+       }
+
+       /**
+        * Register these serializers for using Avro's {@see 
GenericData.Record} and classes
+        * implementing {@see org.apache.avro.specific.SpecificRecordBase}
+        */
+       public static void registerGenericAvro(ExecutionConfig reg) {
+               // Avro POJOs contain java.util.List which have 
GenericData.Array as their runtime type
+               // because Kryo is not able to serialize them properly, we use 
this serializer for them
+               reg.registerTypeWithKryoSerializer(GenericData.Array.class, 
SpecificInstanceCollectionSerializerForArrayList.class);
+               // We register this serializer for users who want to use 
untyped Avro records (GenericData.Record).
+               // Kryo is able to serialize everything in there, except for 
the Schema.
+               // This serializer is very slow, but using the 
GenericData.Records of Kryo is in general a bad idea.
+               // we add the serializer as a default serializer because Avro 
is using a private sub-type at runtime.
+               reg.addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
+       }
+
+
+       public static void registerSpecificAvro(ExecutionConfig reg, Class<? 
extends SpecificRecordBase> avroType) {
+               registerGenericAvro(reg);
+               // This rule only applies if users explicitly use the 
GenericTypeInformation for the avro types
+               // usually, we are able to handle Avro POJOs with the POJO 
serializer.
+               // (However only if the GenericData.Array type is registered!)
+
+               ClassTag<SpecificRecordBase> tag = 
scala.reflect.ClassTag$.MODULE$.apply(avroType);
+               reg.registerTypeWithKryoSerializer(avroType, 
com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
+       }
+
+
+       /**
+        * Currently, the following classes of JodaTime are supported:
+        *      - DateTime
+        *      - Interval
+        *
+        *      The following chronologies are supported: (see {@link 
de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer})
+        * <ul>
+        * <li>{@link org.joda.time.chrono.ISOChronology}</li>
+        * <li>{@link org.joda.time.chrono.CopticChronology}</li>
+        * <li>{@link org.joda.time.chrono.EthiopicChronology}</li>
+        * <li>{@link org.joda.time.chrono.GregorianChronology}</li>
+        * <li>{@link org.joda.time.chrono.JulianChronology}</li>
+        * <li>{@link org.joda.time.chrono.IslamicChronology}</li>
+        * <li>{@link org.joda.time.chrono.BuddhistChronology}</li>
+        * <li>{@link org.joda.time.chrono.GJChronology}</li>
+        * </ul>
+        */
+       public static void registerJodaTime(ExecutionConfig reg) {
+               reg.registerTypeWithKryoSerializer(DateTime.class, 
JodaDateTimeSerializer.class);
+               reg.registerTypeWithKryoSerializer(Interval.class, 
JodaIntervalSerializer.class);
+       }
+
+       /**
+        * Register less frequently used serializers
+        */
+       public static void registerJavaUtils(ExecutionConfig reg) {
+               // BitSet, Regex is already present through twitter-chill.
+       }
+
+
+       // 
--------------------------------------------------------------------------------------------
+       // Custom Serializers
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class SpecificInstanceCollectionSerializerForArrayList 
extends SpecificInstanceCollectionSerializer<ArrayList> {
+               public SpecificInstanceCollectionSerializerForArrayList() {
+                       super(ArrayList.class);
+               }
+       }
+       /**
+        * Special serializer for Java collections enforcing certain instance 
types.
+        * Avro is serializing collections with an "GenericData.Array" type. 
Kryo is not able to handle
+        * this type, so we use ArrayLists.
+        */
+       public static class SpecificInstanceCollectionSerializer<T extends 
Collection> extends CollectionSerializer implements Serializable {
+               private Class<T> type;
+               public SpecificInstanceCollectionSerializer(Class<T> type) {
+                       this.type = type;
+               }
+
+               @Override
+               protected Collection create(Kryo kryo, Input input, 
Class<Collection> type) {
+                       return kryo.newInstance(this.type);
+               }
+
+               @Override
+               protected Collection createCopy(Kryo kryo, Collection original) 
{
+                       return kryo.newInstance(this.type);
+               }
+       }
+
+       /**
+        * Slow serialization approach for Avro schemas.
+        * This is only used with {{@link 
org.apache.avro.generic.GenericData.Record}} types.
+        * Having this serializer, we are able to handle avro Records.
+        */
+       public static class AvroSchemaSerializer extends Serializer<Schema> 
implements Serializable {
+               @Override
+               public void write(Kryo kryo, Output output, Schema object) {
+                       String schemaAsString = object.toString(false);
+                       output.writeString(schemaAsString);
+               }
+
+               @Override
+               public Schema read(Kryo kryo, Input input, Class<Schema> type) {
+                       String schemaAsString = input.readString();
+                       // the parser seems to be stateful, to we need a new 
one for every type.
+                       Schema.Parser sParser = new Schema.Parser();
+                       return sParser.parse(schemaAsString);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
index 2a11bd7..059c78d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
@@ -21,13 +21,10 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.StringUtils;
-import org.joda.time.DateTime;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
deleted file mode 100644
index d1163d5..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class KryoGenericArraySerializerTest extends 
AbstractGenericArraySerializerTest {
-       @Override
-       protected <T> TypeSerializer<T> createComponentSerializer(Class<T> 
type) {
-               return new KryoSerializer<T>(type, new ExecutionConfig());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
deleted file mode 100644
index 01c76d9..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class KryoGenericTypeComparatorTest extends 
AbstractGenericTypeComparatorTest {
-       @Override
-       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new KryoSerializer<T>(type, new ExecutionConfig());
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
deleted file mode 100644
index 2f47e51..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.junit.Test;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Random;
-
-@SuppressWarnings("unchecked")
-public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
-       
-       @Test
-       public void testJavaList(){
-               Collection<Integer> a = new ArrayList<Integer>();
-
-               fillCollection(a);
-
-               runTests(a);
-       }
-
-       @Test
-       public void testJavaSet(){
-               Collection<Integer> b = new HashSet<Integer>();
-
-               fillCollection(b);
-
-               runTests(b);
-       }
-
-
-
-       @Test
-       public void testJavaDequeue(){
-               Collection<Integer> c = new LinkedList<Integer>();
-
-               fillCollection(c);
-
-               runTests(c);
-       }
-
-       @Test
-       public void testJodaTime(){
-               Collection<DateTime> b = new HashSet<DateTime>();
-
-               b.add(new DateTime(1));
-               b.add(new DateTime(2));
-
-               runTests(b);
-       }
-
-       private void fillCollection(Collection<Integer> coll){
-               coll.add(42);
-               coll.add(1337);
-               coll.add(49);
-               coll.add(1);
-       }
-
-       @Override
-       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new KryoSerializer<T>(type, new ExecutionConfig());
-       }
-       
-       /**
-        * Make sure that the kryo serializer forwards EOF exceptions properly 
when serializing
-        */
-       @Test
-       public void testForwardEOFExceptionWhileSerializing() {
-               try {
-                       // construct a long string
-                       String str;
-                       {
-                               char[] charData = new char[40000];
-                               Random rnd = new Random();
-                               
-                               for (int i = 0; i < charData.length; i++) {
-                                       charData[i] = (char) rnd.nextInt(10000);
-                               }
-                               
-                               str = new String(charData);
-                       }
-                       
-                       // construct a memory target that is too small for the 
string
-                       TestDataOutputSerializer target = new 
TestDataOutputSerializer(10000, 30000);
-                       KryoSerializer<String> serializer = new 
KryoSerializer<String>(String.class, new ExecutionConfig());
-                       
-                       try {
-                               serializer.serialize(str, target);
-                               fail("should throw a java.io.EOFException");
-                       }
-                       catch (java.io.EOFException e) {
-                               // that is how we like it
-                       }
-                       catch (Exception e) {
-                               fail("throws wrong exception: should throw a 
java.io.EOFException, has thrown a " + e.getClass().getName());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       /**
-        * Make sure that the kryo serializer forwards EOF exceptions properly 
when serializing
-        */
-       @Test
-       public void testForwardEOFExceptionWhileDeserializing() {
-               try {
-                       int numElements = 100;
-                       // construct a memory target that is too small for the 
string
-                       TestDataOutputSerializer target = new 
TestDataOutputSerializer(5*numElements, 5*numElements);
-                       KryoSerializer<Integer> serializer = new 
KryoSerializer<Integer>(Integer.class, new ExecutionConfig());
-
-                       for(int i = 0; i < numElements; i++){
-                               serializer.serialize(i, target);
-                       }
-
-                       TestInputView source = new 
TestInputView(target.copyByteBuffer());
-
-                       for(int i = 0; i < numElements; i++){
-                               int value = serializer.deserialize(source);
-                               assertEquals(i, value);
-                       }
-
-                       try {
-                               serializer.deserialize(source);
-                               fail("should throw a java.io.EOFException");
-                       }
-                       catch (java.io.EOFException e) {
-                               // that is how we like it :-)
-                       }
-                       catch (Exception e) {
-                               fail("throws wrong exception: should throw a 
java.io.EOFException, has thrown a " + e.getClass().getName());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}
\ No newline at end of file

Reply via email to