[FLINK-1417] Automatically register types with Kryo
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/354efec0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/354efec0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/354efec0 Branch: refs/heads/master Commit: 354efec0f9da0fa03ea9b337b02a1a2a03a9ac16 Parents: 5015ab4 Author: Robert Metzger <rmetz...@apache.org> Authored: Mon Jan 26 17:54:25 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed Feb 18 15:52:08 2015 +0100 ---------------------------------------------------------------------- .gitignore | 1 + .../org/apache/flink/api/io/avro/.gitignore | 1 - .../apache/flink/api/io/avro/AvroPojoTest.java | 157 ------ .../flink/api/common/ExecutionConfig.java | 193 ++++++- .../api/common/typeutils/CompositeType.java | 3 +- .../api/common/typeutils/TypeSerializer.java | 2 +- flink-dist/src/main/flink-bin/LICENSE | 2 + .../src/main/resources/log4j.properties | 23 + flink-java/pom.xml | 19 +- .../flink/api/java/ExecutionEnvironment.java | 102 +++- .../java/org/apache/flink/api/java/Utils.java | 25 + .../flink/api/java/typeutils/AvroTypeInfo.java | 12 +- .../api/java/typeutils/GenericTypeInfo.java | 2 +- .../flink/api/java/typeutils/TypeExtractor.java | 12 +- .../java/typeutils/runtime/KryoSerializer.java | 291 ----------- .../java/typeutils/runtime/PojoSerializer.java | 3 +- .../typeutils/runtime/kryo/KryoSerializer.java | 312 +++++++++++ .../typeutils/runtime/kryo/Serializers.java | 242 +++++++++ .../AbstractGenericTypeSerializerTest.java | 3 - .../runtime/KryoGenericArraySerializerTest.java | 29 -- .../runtime/KryoGenericTypeComparatorTest.java | 29 -- .../runtime/KryoGenericTypeSerializerTest.java | 165 ------ .../runtime/KryoVersusAvroMinibenchmark.java | 520 ------------------ .../runtime/KryoWithCustomSerializersTest.java | 74 --- .../SubclassFromInterfaceSerializerTest.java | 1 + .../kryo/KryoGenericArraySerializerTest.java | 30 ++ .../kryo/KryoGenericTypeComparatorTest.java | 30 ++ .../kryo/KryoGenericTypeSerializerTest.java | 169 ++++++ .../kryo/KryoVersusAvroMinibenchmark.java | 522 +++++++++++++++++++ .../kryo/KryoWithCustomSerializersTest.java | 74 +++ .../typeutils/runtime/kryo/SerializersTest.java | 66 +++ .../src/main/resources/log4j.properties | 23 + .../src/main/resources/log4j.properties | 23 + .../flink/api/scala/ExecutionEnvironment.scala | 39 +- .../api/scala/typeutils/TrySerializer.scala | 2 +- .../flink/api/scala/typeutils/TryTypeInfo.scala | 2 +- .../apache/flink/api/io/avro/AvroPojoTest.java | 164 ++++++ .../api/io/avro/AvroRecordInputFormatTest.java | 19 +- .../io/avro/AvroSplittableInputFormatTest.java | 11 +- .../environment/StreamExecutionEnvironment.java | 38 +- .../api/scala/StreamExecutionEnvironment.scala | 37 +- flink-tests/pom.xml | 13 - .../javaApiOperators/GroupReduceITCase.java | 17 +- .../runtime/KryoGenericTypeSerializerTest.scala | 9 +- .../ScalaSpecialTypesSerializerTest.scala | 2 +- .../api/scala/runtime/TupleSerializerTest.scala | 3 +- pom.xml | 7 +- 47 files changed, 2123 insertions(+), 1400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 522cff5..deaa1ee 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ tmp _site docs/api build-target +flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/ http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore deleted file mode 100644 index dc9b237..0000000 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore +++ /dev/null @@ -1 +0,0 @@ -generated \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java deleted file mode 100644 index 6ff4836..0000000 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.io.avro; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.Path; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.util.Collector; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; -import java.util.Arrays; - -@RunWith(Parameterized.class) -public class AvroPojoTest extends MultipleProgramsTestBase { - public AvroPojoTest(ExecutionMode mode) { - super(mode); - } - - private File inFile; - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - inFile = tempFolder.newFile(); - AvroRecordInputFormatTest.writeTestFile(inFile); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testSimpleAvroRead() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { - @Override - public User map(User value) throws Exception { - value.setTypeMap(null); - return value; - } - }); - - usersDS.writeAsText(resultPath); - - env.execute("Simple Avro read job"); - - - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n"; - } - - @Test - public void testKeySelection() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for(User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - - expected = "(Alyssa,1)\n(Charlie,1)\n"; - } - - /** - * Test some know fields for grouping on - */ - @Test - public void testAllFields() throws Exception { - for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { - testField(fieldName); - } - } - - private void testField(final String fieldName) throws Exception { - before(); - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { - @Override - public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { - for(User u : values) { - out.collect(u.get(fieldName)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Simple Avro read job"); - if(fieldName.equals("name")) { - expected = "Alyssa\nCharlie"; - } else if(fieldName.equals("type_enum")) { - expected = "GREEN\nRED\n"; - } else if(fieldName.equals("type_double_test")) { - expected = "123.45\n1.337\n"; - } else { - Assert.fail("Unknown field"); - } - - after(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 5fa01b7..4a583e1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -21,10 +21,8 @@ package org.apache.flink.api.common; import com.esotericsoftware.kryo.Serializer; import java.io.Serializable; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.ArrayList; +import java.util.List; /** * A configuration config for configuring behavior of the system, such as whether to use @@ -44,12 +42,17 @@ public class ExecutionConfig implements Serializable { private boolean objectReuse = false; + private boolean disableAutoTypeRegistration = false; + // Serializers and types registered with Kryo and the PojoSerializer - private final Map<Class<?>, Serializer<?>> registeredKryoSerializers = new HashMap<Class<?>, Serializer<?>>(); - private final Map<Class<?>, Class<? extends Serializer<?>>> registeredKryoSerializersClasses = new HashMap<Class<?>, Class<? extends Serializer<?>>>(); - private final Set<Class<?>> registeredKryoTypes = new HashSet<Class<?>>(); - private final Set<Class<?>> registeredPojoTypes = new HashSet<Class<?>>(); + // we store them in lists to ensure they are registered in order in all kryo instances. + private final List<Entry<Class<?>, Serializer<?>>> registeredTypesWithKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>(); + private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>(); + private final List<Entry<Class<?>, Serializer<?>>> defaultKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>(); + private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>(); + private final List<Class<?>> registeredKryoTypes = new ArrayList<Class<?>>(); + private final List<Class<?>> registeredPojoTypes = new ArrayList<Class<?>>(); /** * Enables the ClosureCleaner. This analyzes user code functions and sets fields to null @@ -191,9 +194,10 @@ public class ExecutionConfig implements Serializable { // Registry for types and serializers // -------------------------------------------------------------------------------------------- + + /** - * Registers the given Serializer as a default serializer for the given type at the - * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}. + * Adds a new Kryo default serializer to the Runtime. * * Note that the serializer instance must be serializable (as defined by java.io.Serializable), * because it may be distributed to the worker nodes by java serialization. @@ -201,7 +205,7 @@ public class ExecutionConfig implements Serializable { * @param type The class of the types serialized with the given serializer. * @param serializer The serializer to use. */ - public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) { + public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) { if (type == null || serializer == null) { throw new NullPointerException("Cannot register null class or serializer."); } @@ -210,23 +214,66 @@ public class ExecutionConfig implements Serializable { + "as defined by java.io.Serializable. For stateless serializers, you can use the " + "'registerSerializer(Class, Class)' method to register the serializer via its class."); } - - registeredKryoSerializers.put(type, serializer); + Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, Serializer<?>>(type, serializer); + if(!defaultKryoSerializers.contains(e)) { + defaultKryoSerializers.add(e); + } } /** - * Registers the given Serializer via its class as a serializer for the given type at the - * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}. + * Adds a new Kryo default serializer to the Runtime. * * @param type The class of the types serialized with the given serializer. * @param serializerClass The class of the serializer to use. */ - public void registerKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { + public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { if (type == null || serializerClass == null) { throw new NullPointerException("Cannot register null class or serializer."); } + Entry<Class<?>, Class<? extends Serializer<?>>> e = new Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass); + if(!defaultKryoSerializerClasses.contains(e)) { + defaultKryoSerializerClasses.add(e); + } + } + + /** + * Registers the given type with a Kryo Serializer. + * + * Note that the serializer instance must be serializable (as defined by java.io.Serializable), + * because it may be distributed to the worker nodes by java serialization. + * + * @param type The class of the types serialized with the given serializer. + * @param serializer The serializer to use. + */ + public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) { + if (type == null || serializer == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + if (!(serializer instanceof java.io.Serializable)) { + throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), " + + "as defined by java.io.Serializable. For stateless serializers, you can use the " + + "'registerSerializer(Class, Class)' method to register the serializer via its class."); + } + Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, Serializer<?>>(type, serializer); + if(!registeredTypesWithKryoSerializers.contains(e)) { + registeredTypesWithKryoSerializers.add(e); + } + } - registeredKryoSerializersClasses.put(type, serializerClass); + /** + * Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer + * + * @param type The class of the types serialized with the given serializer. + * @param serializerClass The class of the serializer to use. + */ + public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { + if (type == null || serializerClass == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + Entry<Class<?>, Class<? extends Serializer<?>>> e = new Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass); + if(!registeredTypesWithKryoSerializerClasses.contains(e)) { + registeredTypesWithKryoSerializerClasses.add(e); + } } /** @@ -241,7 +288,9 @@ public class ExecutionConfig implements Serializable { if (type == null) { throw new NullPointerException("Cannot register null type class."); } - registeredPojoTypes.add(type); + if(!registeredPojoTypes.contains(type)) { + registeredPojoTypes.add(type); + } } /** @@ -260,29 +309,48 @@ public class ExecutionConfig implements Serializable { } /** - * Returns the registered Kryo Serializers. + * Returns the registered types with Kryo Serializers. */ - public Map<Class<?>, Serializer<?>> getRegisteredKryoSerializers() { - return registeredKryoSerializers; + public List<Entry<Class<?>, Serializer<?>>> getRegisteredTypesWithKryoSerializers() { + return registeredTypesWithKryoSerializers; } /** - * Returns the registered Kryo Serializer classes. + * Returns the registered types with their Kryo Serializer classes. */ - public Map<Class<?>, Class<? extends Serializer<?>>> getRegisteredKryoSerializersClasses() { - return registeredKryoSerializersClasses; + public List<Entry<Class<?>, Class<? extends Serializer<?>>>> getRegisteredTypesWithKryoSerializerClasses() { + return registeredTypesWithKryoSerializerClasses; + } + + + /** + * Returns the registered default Kryo Serializers. + */ + public List<Entry<Class<?>, Serializer<?>>> getDefaultKryoSerializers() { + return defaultKryoSerializers; + } + + /** + * Returns the registered default Kryo Serializer classes. + */ + public List<Entry<Class<?>, Class<? extends Serializer<?>>>> getDefaultKryoSerializerClasses() { + return defaultKryoSerializerClasses; } /** * Returns the registered Kryo types. */ - public Set<Class<?>> getRegisteredKryoTypes() { + public List<Class<?>> getRegisteredKryoTypes() { if (isForceKryoEnabled()) { // if we force kryo, we must also return all the types that // were previously only registered as POJO - Set<Class<?>> result = new HashSet<Class<?>>(); + List<Class<?>> result = new ArrayList<Class<?>>(); result.addAll(registeredKryoTypes); - result.addAll(registeredPojoTypes); + for(Class<?> t : registeredPojoTypes) { + if (!result.contains(t)) { + result.add(t); + } + } return result; } else { return registeredKryoTypes; @@ -292,7 +360,76 @@ public class ExecutionConfig implements Serializable { /** * Returns the registered POJO types. */ - public Set<Class<?>> getRegisteredPojoTypes() { + public List<Class<?>> getRegisteredPojoTypes() { return registeredPojoTypes; } + + + public boolean isDisableAutoTypeRegistration() { + return disableAutoTypeRegistration; + } + + /** + * Control whether Flink is automatically registering all types in the user programs with + * Kryo. + * + * @param disableAutoTypeRegistration + */ + public void setDisableAutoTypeRegistration(boolean disableAutoTypeRegistration) { + this.disableAutoTypeRegistration = disableAutoTypeRegistration; + } + + + public static class Entry<K, V> implements Serializable { + private final K k; + private final V v; + public Entry(K k, V v) { + this.k = k; + this.v = v; + } + + public K getKey() { + return k; + } + + public V getValue() { + return v; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Entry entry = (Entry) o; + + if (k != null ? !k.equals(entry.k) : entry.k != null) { + return false; + } + if (v != null ? !v.equals(entry.v) : entry.v != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = k != null ? k.hashCode() : 0; + result = 31 * result + (v != null ? v.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "Entry{" + + "k=" + k + + ", v=" + v + + '}'; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java index 17b8dab..3bd1fff 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java @@ -132,9 +132,8 @@ public abstract class CompositeType<T> extends TypeInformation<T> { } return getNewComparator(config); } - - + public static class FlatFieldDescriptor { private int keyPosition; private TypeInformation<?> type; http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index 329e826..542b059 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -42,7 +42,7 @@ public abstract class TypeSerializer<T> implements Serializable { // -------------------------------------------------------------------------------------------- // General information about the type and the serializer // -------------------------------------------------------------------------------------------- - + /** * Gets whether the type is an immutable type. * http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-dist/src/main/flink-bin/LICENSE ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE index 25f91d4..89d8eca 100644 --- a/flink-dist/src/main/flink-bin/LICENSE +++ b/flink-dist/src/main/flink-bin/LICENSE @@ -233,7 +233,9 @@ under the Apache License (v 2.0): - Apache Kafka (http://kafka.apache.org) - Apache Flume (http://flume.apache.org) - Apache Sling (http://sling.apache.org) + - Apache Thrift (http://thrift.apache.org) - Google Guava (https://code.google.com/p/guava-libraries/) + - Google Protocol Buffers (https://github.com/google/protobuf/) - Netty v4.0.21 (http://netty.io) - Powermock (http://www.powermock.org) - Javassist (http://www.javassist.org) http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-examples/flink-java-examples/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/resources/log4j.properties b/flink-examples/flink-java-examples/src/main/resources/log4j.properties new file mode 100644 index 0000000..da32ea0 --- /dev/null +++ b/flink-examples/flink-java-examples/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index dc5076a..640c527 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -33,6 +33,9 @@ under the License. <name>flink-java</name> <packaging>jar</packaging> + <properties> + <chill.version>0.5.2</chill.version> + </properties> <dependencies> <dependency> @@ -60,19 +63,19 @@ under the License. <dependency> <groupId>com.twitter</groupId> <artifactId>chill_2.10</artifactId> - <version>0.5.1</version> + <version>${chill.version}</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>chill-avro_2.10</artifactId> - <version>0.5.1</version> + <version>${chill.version}</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>chill-protobuf</artifactId> - <version>0.5.1</version> + <version>${chill.version}</version> </dependency> <!-- We need protobuf for chill-protobuf --> <dependency> @@ -84,7 +87,7 @@ under the License. <dependency> <groupId>com.twitter</groupId> <artifactId>chill-thrift</artifactId> - <version>0.5.1</version> + <version>${chill.version}</version> </dependency> <!-- libthrift is required by chill-thrift --> <dependency> @@ -112,7 +115,6 @@ under the License. <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> - <version>2.7</version> </dependency> <!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading --> @@ -130,13 +132,6 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> - - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - <scope>test</scope> - </dependency> - </dependencies> <!-- Because flink-scala and flink-avro uses it in tests --> http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 1105ab9..21e1f24 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -30,6 +30,7 @@ import java.util.UUID; import com.esotericsoftware.kryo.Serializer; +import com.google.common.base.Joiner; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -38,8 +39,10 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.operators.OperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.api.java.io.CsvReader; @@ -54,16 +57,21 @@ import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.operators.OperatorTranslation; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; import org.apache.flink.util.SplittableIterator; +import org.apache.flink.util.Visitor; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The ExecutionEnviroment is the context in which a program is executed. A @@ -84,6 +92,8 @@ import org.apache.hadoop.mapreduce.Job; * @see RemoteEnvironment */ public abstract class ExecutionEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class); /** The environment of the context (local by default, cluster if invoked through command line) */ private static ExecutionEnvironmentFactory contextEnvironmentFactory; @@ -200,29 +210,51 @@ public abstract class ExecutionEnvironment { // Registry for types and serializers // -------------------------------------------------------------------------------------------- + /** - * Registers the given Serializer as a default serializer for the given type at the - * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}. - * + * Adds a new Kryo default serializer to the Runtime. + * * Note that the serializer instance must be serializable (as defined by java.io.Serializable), * because it may be distributed to the worker nodes by java serialization. - * + * * @param type The class of the types serialized with the given serializer. * @param serializer The serializer to use. */ - public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) { - config.registerKryoSerializer(type, serializer); + public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) { + config.addDefaultKryoSerializer(type, serializer); } /** - * Registers the given Serializer via its class as a serializer for the given type at the - * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}. - * + * Adds a new Kryo default serializer to the Runtime. + * + * @param type The class of the types serialized with the given serializer. + * @param serializerClass The class of the serializer to use. + */ + public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { + config.addDefaultKryoSerializer(type, serializerClass); + } + + /** + * Registers the given type with a Kryo Serializer. + * + * Note that the serializer instance must be serializable (as defined by java.io.Serializable), + * because it may be distributed to the worker nodes by java serialization. + * + * @param type The class of the types serialized with the given serializer. + * @param serializer The serializer to use. + */ + public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) { + config.registerTypeWithKryoSerializer(type, serializer); + } + + /** + * Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer + * * @param type The class of the types serialized with the given serializer. * @param serializerClass The class of the serializer to use. */ - public void registerKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { - config.registerKryoSerializer(type, serializerClass); + public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { + config.registerTypeWithKryoSerializer(type, serializerClass); } /** @@ -743,7 +775,7 @@ public abstract class ExecutionEnvironment { * @throws Exception Thrown, if the program executions fails. */ public abstract JobExecutionResult execute(String jobName) throws Exception; - + /** * Creates the plan with which the system will execute the program, and returns it as * a String using a JSON representation of the execution data flow graph. @@ -856,11 +888,35 @@ public abstract class ExecutionEnvironment { OperatorTranslation translator = new OperatorTranslation(); JavaPlan plan = translator.translateToPlan(this.sinks, jobName); - + if (getDegreeOfParallelism() > 0) { plan.setDefaultParallelism(getDegreeOfParallelism()); } plan.setExecutionConfig(getConfig()); + // Check plan for GenericTypeInfo's and register the types at the serializers. + plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() { + @Override + public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) { + OperatorInformation<?> opInfo = visitable.getOperatorInfo(); + TypeInformation<?> typeInfo = opInfo.getOutputType(); + if(typeInfo instanceof GenericTypeInfo) { + GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo; + if(!config.isDisableAutoTypeRegistration()) { + Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config); + } + } + if(typeInfo instanceof CompositeType) { + List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<GenericTypeInfo<?>>(); + Utils.getContainedGenericTypes((CompositeType)typeInfo, genericTypesInComposite); + for(GenericTypeInfo<?> gt : genericTypesInComposite) { + Serializers.recursivelyRegisterType(gt.getTypeClass(), config); + } + } + return true; + } + @Override + public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {} + }); try { registerCachedFilesWithPlan(plan); @@ -872,7 +928,25 @@ public abstract class ExecutionEnvironment { if (clearSinks) { this.sinks.clear(); } - + + // All types are registered now. Print information. + int registeredTypes = config.getRegisteredKryoTypes().size() + + config.getRegisteredPojoTypes().size() + + config.getRegisteredTypesWithKryoSerializerClasses().size() + + config.getRegisteredTypesWithKryoSerializers().size(); + int defaultKryoSerializers = config.getDefaultKryoSerializers().size() + + config.getDefaultKryoSerializerClasses().size(); + LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers); + + if(LOG.isDebugEnabled()) { + LOG.debug("Registered Kryo types: {}", Joiner.on(',').join(config.getRegisteredKryoTypes())); + LOG.debug("Registered Kryo with Serializers types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializers())); + LOG.debug("Registered Kryo with Serializer Classes types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializerClasses())); + LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers())); + LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses())); + LOG.debug("Registered POJO types: {}", Joiner.on(',').join(config.getRegisteredPojoTypes())); + } + return plan; } http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/Utils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index ba39aec..21df168 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -18,6 +18,12 @@ package org.apache.flink.api.java; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; + +import java.util.List; + public class Utils { public static String getCallLocationName() { @@ -35,4 +41,23 @@ public class Utils { return String.format("%s(%s:%d)", elem.getMethodName(), elem.getFileName(), elem.getLineNumber()); } + + /** + * Returns all GenericTypeInfos contained in a composite type. + * + * @param typeInfo + * @return + */ + public static void getContainedGenericTypes(CompositeType typeInfo, List<GenericTypeInfo<?>> target) { + for(int i = 0; i < typeInfo.getArity(); i++) { + TypeInformation<?> type = typeInfo.getTypeAt(i); + if(type instanceof CompositeType) { + getContainedGenericTypes((CompositeType) type, target); + } else if(type instanceof GenericTypeInfo) { + if(!target.contains(type)) { + target.add((GenericTypeInfo<?>) type); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java index ccdf7f7..bf580d1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java @@ -27,10 +27,16 @@ import java.util.ArrayList; import java.util.List; /** - * Special type information to generate a POJO type info from an avro schema. + * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs) * * Proceeding: It uses a regular pojo type analysis and replaces all GenericType<CharSequence> - * with a GenericType<avro.Utf8> + * with a GenericType<avro.Utf8>. + * All other types used by Avro are standard Java types. + * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime. + * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here + * by generic type infos containing Utf8 classes (which are comparable), + * + * This class is checked by the AvroPojoTest. * @param <T> */ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> { @@ -40,7 +46,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) { PojoTypeExtractor pte = new PojoTypeExtractor(); - TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null); + TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null, null, null); if(!(ti instanceof PojoTypeInfo)) { throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java index c629ba3..34226ce 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator; -import org.apache.flink.api.java.typeutils.runtime.KryoSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> { http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 73cedd1..5f52f98 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -1259,7 +1259,7 @@ public class TypeExtractor { } @SuppressWarnings("unchecked") - private <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, + protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { // add the hierarchy of the POJO itself if it is generic @@ -1385,8 +1385,9 @@ public class TypeExtractor { } return result; } - - private static Class<?> typeToClass(Type t) { + + // not public to users + public static Class<?> typeToClass(Type t) { if (t instanceof Class) { return (Class<?>)t; } @@ -1395,8 +1396,9 @@ public class TypeExtractor { } throw new IllegalArgumentException("Cannot convert type to class"); } - - private static boolean isClassType(Type t) { + + // not public to users + public static boolean isClassType(Type t) { return t instanceof Class<?> || t instanceof ParameterizedType; } http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java deleted file mode 100644 index d8411a0..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoException; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.twitter.chill.ScalaKryoInstantiator; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.lang.reflect.Modifier; -import java.util.Map; -import java.util.Set; - -/** - * A type serializer that serializes its type using the Kryo serialization - * framework (https://github.com/EsotericSoftware/kryo). - * - * This serializer is intended as a fallback serializer for the cases that are - * not covered by the basic types, tuples, and POJOs. - * - * @param <T> The type to be serialized. - */ -public class KryoSerializer<T> extends TypeSerializer<T> { - - private static final long serialVersionUID = 3L; - - // ------------------------------------------------------------------------ - - private final Map<Class<?>, Serializer<?>> registeredSerializers; - private final Map<Class<?>, Class<? extends Serializer<?>>> registeredSerializersClasses; - private final Set<Class<?>> registeredTypes; - - private final Class<T> type; - - // ------------------------------------------------------------------------ - // The fields below are lazily initialized after duplication or deserialization. - - private transient Kryo kryo; - private transient T copyInstance; - - private transient DataOutputView previousOut; - private transient DataInputView previousIn; - - private transient Input input; - private transient Output output; - - // ------------------------------------------------------------------------ - - public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ - if(type == null){ - throw new NullPointerException("Type class cannot be null."); - } - this.type = type; - - this.registeredSerializers = executionConfig.getRegisteredKryoSerializers(); - this.registeredSerializersClasses = executionConfig.getRegisteredKryoSerializersClasses(); - this.registeredTypes = executionConfig.getRegisteredKryoTypes(); - } - - /** - * Copy-constructor that does not copy transient fields. They will be initialized once required. - */ - protected KryoSerializer(KryoSerializer<T> toCopy) { - registeredSerializers = toCopy.registeredSerializers; - registeredSerializersClasses = toCopy.registeredSerializersClasses; - registeredTypes = toCopy.registeredTypes; - - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); - } - } - - // ------------------------------------------------------------------------ - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public KryoSerializer<T> duplicate() { - return new KryoSerializer<T>(this); - } - - @Override - public T createInstance() { - if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) { - return null; - } else { - checkKryoInitialized(); - try { - return kryo.newInstance(type); - } catch(Throwable e) { - return null; - } - } - } - - @SuppressWarnings("unchecked") - @Override - public T copy(T from) { - if (from == null) { - return null; - } - checkKryoInitialized(); - try { - return kryo.copy(from); - } - catch(KryoException ke) { - // kryo was unable to copy it, so we do it through serialization: - ByteArrayOutputStream baout = new ByteArrayOutputStream(); - Output output = new Output(baout); - - kryo.writeObject(output, from); - - output.close(); - - ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); - Input input = new Input(bain); - - return (T)kryo.readObject(input, from.getClass()); - } - } - - @Override - public T copy(T from, T reuse) { - return copy(from); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T record, DataOutputView target) throws IOException { - checkKryoInitialized(); - if (target != previousOut) { - DataOutputViewStream outputStream = new DataOutputViewStream(target); - output = new Output(outputStream); - previousOut = target; - } - - try { - kryo.writeClassAndObject(output, record); - output.flush(); - } - catch (KryoException ke) { - Throwable cause = ke.getCause(); - if (cause instanceof EOFException) { - throw (EOFException) cause; - } - else { - throw ke; - } - } - } - - @SuppressWarnings("unchecked") - @Override - public T deserialize(DataInputView source) throws IOException { - checkKryoInitialized(); - if (source != previousIn) { - DataInputViewStream inputStream = new DataInputViewStream(source); - input = new NoFetchingInput(inputStream); - previousIn = source; - } - - try { - return (T) kryo.readClassAndObject(input); - } catch (KryoException ke) { - Throwable cause = ke.getCause(); - - if(cause instanceof EOFException) { - throw (EOFException) cause; - } else { - throw ke; - } - } - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - checkKryoInitialized(); - if(this.copyInstance == null){ - this.copyInstance = createInstance(); - } - - T tmp = deserialize(copyInstance, source); - serialize(tmp, target); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return type.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof KryoSerializer) { - KryoSerializer<?> other = (KryoSerializer<?>) obj; - return other.type == this.type; - } else { - return false; - } - } - - // -------------------------------------------------------------------------------------------- - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new ScalaKryoInstantiator().newKryo(); - - // Throwable and all subclasses should be serialized via java serialization - kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); - - // register the type of our class - kryo.register(type); - - // register given types. we do this first so that any registration of a - // more specific serializer overrides this - for (Class<?> type : registeredTypes) { - kryo.register(type); - } - - // register given serializer classes - for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredSerializersClasses.entrySet()) { - Class<?> typeClass = e.getKey(); - Class<? extends Serializer<?>> serializerClass = e.getValue(); - - Serializer<?> serializer = - ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass); - kryo.register(typeClass, serializer); - } - - // register given serializers - for (Map.Entry<Class<?>, Serializer<?>> e : registeredSerializers.entrySet()) { - kryo.register(e.getKey(), e.getValue()); - } - - kryo.setRegistrationRequired(false); - kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); - } - } - - // -------------------------------------------------------------------------------------------- - // For testing - // -------------------------------------------------------------------------------------------- - - Kryo getKryo() { - checkKryoInitialized(); - return this.kryo; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index f7742ca..8b95296 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -79,7 +78,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { this.numFields = fieldSerializers.length; this.executionConfig = executionConfig; - Set<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + List<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java new file mode 100644 index 0000000..e14546e --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.twitter.chill.ScalaKryoInstantiator; + +import org.apache.avro.generic.GenericData; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.lang.reflect.Modifier; +import java.util.List; + +/** + * A type serializer that serializes its type using the Kryo serialization + * framework (https://github.com/EsotericSoftware/kryo). + * + * This serializer is intended as a fallback serializer for the cases that are + * not covered by the basic types, tuples, and POJOs. + * + * @param <T> The type to be serialized. + */ +public class KryoSerializer<T> extends TypeSerializer<T> { + + private static final long serialVersionUID = 3L; + + // ------------------------------------------------------------------------ + + private final List<ExecutionConfig.Entry<Class<?>, Serializer<?>>> registeredTypesWithSerializers; + private final List<ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithSerializerClasses; + private final List<ExecutionConfig.Entry<Class<?>, Serializer<?>>> defaultSerializers; + private final List<ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>>> defaultSerializerClasses; + private final List<Class<?>> registeredTypes; + + private final Class<T> type; + + // ------------------------------------------------------------------------ + // The fields below are lazily initialized after duplication or deserialization. + + private transient Kryo kryo; + private transient T copyInstance; + + private transient DataOutputView previousOut; + private transient DataInputView previousIn; + + private transient Input input; + private transient Output output; + + // ------------------------------------------------------------------------ + + public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ + if(type == null){ + throw new NullPointerException("Type class cannot be null."); + } + this.type = type; + + this.defaultSerializers = executionConfig.getDefaultKryoSerializers(); + this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses(); + this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers(); + this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses(); + this.registeredTypes = executionConfig.getRegisteredKryoTypes(); + } + + /** + * Copy-constructor that does not copy transient fields. They will be initialized once required. + */ + protected KryoSerializer(KryoSerializer<T> toCopy) { + registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers; + registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses; + defaultSerializers = toCopy.defaultSerializers; + defaultSerializerClasses = toCopy.defaultSerializerClasses; + registeredTypes = toCopy.registeredTypes; + + type = toCopy.type; + if(type == null){ + throw new NullPointerException("Type class cannot be null."); + } + } + + // ------------------------------------------------------------------------ + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public KryoSerializer<T> duplicate() { + return new KryoSerializer<T>(this); + } + + @Override + public T createInstance() { + if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) { + return null; + } else { + checkKryoInitialized(); + try { + return kryo.newInstance(type); + } catch(Throwable e) { + return null; + } + } + } + + @SuppressWarnings("unchecked") + @Override + public T copy(T from) { + if (from == null) { + return null; + } + checkKryoInitialized(); + try { + return kryo.copy(from); + } + catch(KryoException ke) { + // kryo was unable to copy it, so we do it through serialization: + ByteArrayOutputStream baout = new ByteArrayOutputStream(); + Output output = new Output(baout); + + kryo.writeObject(output, from); + + output.close(); + + ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); + Input input = new Input(bain); + + return (T)kryo.readObject(input, from.getClass()); + } + } + + @Override + public T copy(T from, T reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + checkKryoInitialized(); + if (target != previousOut) { + DataOutputViewStream outputStream = new DataOutputViewStream(target); + output = new Output(outputStream); + previousOut = target; + } + + try { + kryo.writeClassAndObject(output, record); + output.flush(); + } + catch (KryoException ke) { + Throwable cause = ke.getCause(); + if (cause instanceof EOFException) { + throw (EOFException) cause; + } + else { + throw ke; + } + } + } + + @SuppressWarnings("unchecked") + @Override + public T deserialize(DataInputView source) throws IOException { + checkKryoInitialized(); + if (source != previousIn) { + DataInputViewStream inputStream = new DataInputViewStream(source); + input = new NoFetchingInput(inputStream); + previousIn = source; + } + + try { + return (T) kryo.readClassAndObject(input); + } catch (KryoException ke) { + Throwable cause = ke.getCause(); + + if(cause instanceof EOFException) { + throw (EOFException) cause; + } else { + throw ke; + } + } + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + checkKryoInitialized(); + if(this.copyInstance == null){ + this.copyInstance = createInstance(); + } + + T tmp = deserialize(copyInstance, source); + serialize(tmp, target); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return type.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof KryoSerializer) { + KryoSerializer<?> other = (KryoSerializer<?>) obj; + return other.type == this.type; + } else { + return false; + } + } + + // -------------------------------------------------------------------------------------------- + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new ScalaKryoInstantiator().newKryo(); + + // Throwable and all subclasses should be serialized via java serialization + kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); + + // Add default serializers first, so that they type registrations without a serializer + // are registered with a default serializer + for(ExecutionConfig.Entry<Class<?>, Serializer<?>> serializer : defaultSerializers) { + kryo.addDefaultSerializer(serializer.getKey(), serializer.getValue()); + } + for(ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>> serializer : defaultSerializerClasses) { + kryo.addDefaultSerializer(serializer.getKey(), serializer.getValue()); + } + + // register the type of our class + kryo.register(type); + + // register given types. we do this first so that any registration of a + // more specific serializer overrides this + for (Class<?> type : registeredTypes) { + kryo.register(type); + } + + // register given serializer classes + for (ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses) { + Class<?> typeClass = e.getKey(); + Class<? extends Serializer<?>> serializerClass = e.getValue(); + + Serializer<?> serializer = + ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass); + kryo.register(typeClass, serializer); + } + + // register given serializers + for (ExecutionConfig.Entry<Class<?>, Serializer<?>> e : registeredTypesWithSerializers) { + kryo.register(e.getKey(), e.getValue()); + } + // this is needed for Avro but can not be added on demand. + kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList()); + + kryo.setRegistrationRequired(false); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + } + } + + // -------------------------------------------------------------------------------------------- + // For testing + // -------------------------------------------------------------------------------------------- + + Kryo getKryo() { + checkKryoInitialized(); + return this.kryo; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java new file mode 100644 index 0000000..3e355aa --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.CollectionSerializer; +import com.google.protobuf.Message; +import com.twitter.chill.protobuf.ProtobufSerializer; +import com.twitter.chill.thrift.TBaseSerializer; +import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer; +import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.thrift.protocol.TMessage; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import scala.reflect.ClassTag; + +import java.io.Serializable; +import java.lang.reflect.Field; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + + +/** + * Class containing utilities for the serializers of the Flink Runtime. + * + * Most of the serializers are automatically added to the system. + * + * Note that users can also implement the {@link com.esotericsoftware.kryo.KryoSerializable} interface + * to provide custom serialization for their classes. + * Also, there is a Java Annotation for adding a default serializer (@DefaultSerializer) to classes. + */ +public class Serializers { + /** + * NOTE: This method is not a public Flink API. + * + * This method walks the entire hierarchy of the given type and registers all types it encounters + * to Kryo. + * It also watches for types which need special serializers. + */ + private static Set<Class<?>> alreadySeen = new HashSet<Class<?>>(); + + public static void recursivelyRegisterType(Class<?> type, ExecutionConfig config) { + alreadySeen.add(type); + if(type.isPrimitive()) { + return; + } + config.registerKryoType(type); + addSerializerForType(config, type); + + Field[] fields = type.getDeclaredFields(); + for(Field field : fields) { + Type fieldType = field.getGenericType(); + if(fieldType instanceof ParameterizedType) { // field has generics + ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType; + for(Type t: parameterizedFieldType.getActualTypeArguments()) { + if(TypeExtractor.isClassType(t) ) { + Class clazz = TypeExtractor.typeToClass(t); + if(!alreadySeen.contains(clazz)) { + recursivelyRegisterType(TypeExtractor.typeToClass(t), config); + } + } + } + } + Class<?> clazz = field.getType(); + if(!alreadySeen.contains(clazz)) { + recursivelyRegisterType(clazz, config); + } + } + } + + public static void addSerializerForType(ExecutionConfig reg, Class<?> type) { + if(Message.class.isAssignableFrom(type)) { + registerProtoBuf(reg); + } + if(TMessage.class.isAssignableFrom(type)) { + registerThrift(reg); + } + if(GenericData.Record.class.isAssignableFrom(type)) { + registerGenericAvro(reg); + } + if(SpecificRecordBase.class.isAssignableFrom(type)) { + registerSpecificAvro(reg, (Class<? extends SpecificRecordBase>) type); + } + if(DateTime.class.isAssignableFrom(type) || Interval.class.isAssignableFrom(type)) { + registerJodaTime(reg); + } + } + + /** + * Register serializers required for Google Protocol Buffers + * with Flink runtime. + */ + public static void registerProtoBuf(ExecutionConfig reg) { + // Google Protobuf (FLINK-1392) + reg.registerTypeWithKryoSerializer(Message.class, ProtobufSerializer.class); + } + + /** + * Register Apache Thrift messages + */ + public static void registerThrift(ExecutionConfig reg) { + // TBaseSerializer states it should be initalized as a default Kryo serializer + reg.addDefaultKryoSerializer(TMessage.class, TBaseSerializer.class); + reg.registerKryoType(TMessage.class); + } + + /** + * Register these serializers for using Avro's {@see GenericData.Record} and classes + * implementing {@see org.apache.avro.specific.SpecificRecordBase} + */ + public static void registerGenericAvro(ExecutionConfig reg) { + // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type + // because Kryo is not able to serialize them properly, we use this serializer for them + reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class); + // We register this serializer for users who want to use untyped Avro records (GenericData.Record). + // Kryo is able to serialize everything in there, except for the Schema. + // This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea. + // we add the serializer as a default serializer because Avro is using a private sub-type at runtime. + reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class); + } + + + public static void registerSpecificAvro(ExecutionConfig reg, Class<? extends SpecificRecordBase> avroType) { + registerGenericAvro(reg); + // This rule only applies if users explicitly use the GenericTypeInformation for the avro types + // usually, we are able to handle Avro POJOs with the POJO serializer. + // (However only if the GenericData.Array type is registered!) + + ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(avroType); + reg.registerTypeWithKryoSerializer(avroType, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); + } + + + /** + * Currently, the following classes of JodaTime are supported: + * - DateTime + * - Interval + * + * The following chronologies are supported: (see {@link de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer}) + * <ul> + * <li>{@link org.joda.time.chrono.ISOChronology}</li> + * <li>{@link org.joda.time.chrono.CopticChronology}</li> + * <li>{@link org.joda.time.chrono.EthiopicChronology}</li> + * <li>{@link org.joda.time.chrono.GregorianChronology}</li> + * <li>{@link org.joda.time.chrono.JulianChronology}</li> + * <li>{@link org.joda.time.chrono.IslamicChronology}</li> + * <li>{@link org.joda.time.chrono.BuddhistChronology}</li> + * <li>{@link org.joda.time.chrono.GJChronology}</li> + * </ul> + */ + public static void registerJodaTime(ExecutionConfig reg) { + reg.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class); + reg.registerTypeWithKryoSerializer(Interval.class, JodaIntervalSerializer.class); + } + + /** + * Register less frequently used serializers + */ + public static void registerJavaUtils(ExecutionConfig reg) { + // BitSet, Regex is already present through twitter-chill. + } + + + // -------------------------------------------------------------------------------------------- + // Custom Serializers + // -------------------------------------------------------------------------------------------- + + public static class SpecificInstanceCollectionSerializerForArrayList extends SpecificInstanceCollectionSerializer<ArrayList> { + public SpecificInstanceCollectionSerializerForArrayList() { + super(ArrayList.class); + } + } + /** + * Special serializer for Java collections enforcing certain instance types. + * Avro is serializing collections with an "GenericData.Array" type. Kryo is not able to handle + * this type, so we use ArrayLists. + */ + public static class SpecificInstanceCollectionSerializer<T extends Collection> extends CollectionSerializer implements Serializable { + private Class<T> type; + public SpecificInstanceCollectionSerializer(Class<T> type) { + this.type = type; + } + + @Override + protected Collection create(Kryo kryo, Input input, Class<Collection> type) { + return kryo.newInstance(this.type); + } + + @Override + protected Collection createCopy(Kryo kryo, Collection original) { + return kryo.newInstance(this.type); + } + } + + /** + * Slow serialization approach for Avro schemas. + * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types. + * Having this serializer, we are able to handle avro Records. + */ + public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable { + @Override + public void write(Kryo kryo, Output output, Schema object) { + String schemaAsString = object.toString(false); + output.writeString(schemaAsString); + } + + @Override + public Schema read(Kryo kryo, Input input, Class<Schema> type) { + String schemaAsString = input.readString(); + // the parser seems to be stateful, to we need a new one for every type. + Schema.Parser sParser = new Schema.Parser(); + return sParser.parse(schemaAsString); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java index 2a11bd7..059c78d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java @@ -21,13 +21,10 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.api.common.typeutils.SerializerTestInstance; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.util.StringUtils; -import org.joda.time.DateTime; import org.junit.Test; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java deleted file mode 100644 index d1163d5..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class KryoGenericArraySerializerTest extends AbstractGenericArraySerializerTest { - @Override - protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) { - return new KryoSerializer<T>(type, new ExecutionConfig()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java deleted file mode 100644 index 01c76d9..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest { - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new KryoSerializer<T>(type, new ExecutionConfig()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/354efec0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java deleted file mode 100644 index 2f47e51..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.ExecutionConfig; -import org.junit.Test; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Random; - -@SuppressWarnings("unchecked") -public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { - - @Test - public void testJavaList(){ - Collection<Integer> a = new ArrayList<Integer>(); - - fillCollection(a); - - runTests(a); - } - - @Test - public void testJavaSet(){ - Collection<Integer> b = new HashSet<Integer>(); - - fillCollection(b); - - runTests(b); - } - - - - @Test - public void testJavaDequeue(){ - Collection<Integer> c = new LinkedList<Integer>(); - - fillCollection(c); - - runTests(c); - } - - @Test - public void testJodaTime(){ - Collection<DateTime> b = new HashSet<DateTime>(); - - b.add(new DateTime(1)); - b.add(new DateTime(2)); - - runTests(b); - } - - private void fillCollection(Collection<Integer> coll){ - coll.add(42); - coll.add(1337); - coll.add(49); - coll.add(1); - } - - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new KryoSerializer<T>(type, new ExecutionConfig()); - } - - /** - * Make sure that the kryo serializer forwards EOF exceptions properly when serializing - */ - @Test - public void testForwardEOFExceptionWhileSerializing() { - try { - // construct a long string - String str; - { - char[] charData = new char[40000]; - Random rnd = new Random(); - - for (int i = 0; i < charData.length; i++) { - charData[i] = (char) rnd.nextInt(10000); - } - - str = new String(charData); - } - - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000); - KryoSerializer<String> serializer = new KryoSerializer<String>(String.class, new ExecutionConfig()); - - try { - serializer.serialize(str, target); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - /** - * Make sure that the kryo serializer forwards EOF exceptions properly when serializing - */ - @Test - public void testForwardEOFExceptionWhileDeserializing() { - try { - int numElements = 100; - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements); - KryoSerializer<Integer> serializer = new KryoSerializer<Integer>(Integer.class, new ExecutionConfig()); - - for(int i = 0; i < numElements; i++){ - serializer.serialize(i, target); - } - - TestInputView source = new TestInputView(target.copyByteBuffer()); - - for(int i = 0; i < numElements; i++){ - int value = serializer.deserialize(source); - assertEquals(i, value); - } - - try { - serializer.deserialize(source); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it :-) - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} \ No newline at end of file