[FLINK-3305] [core] Remove limited and inconsistent auto-magic for Joda Time
The auto-magic for Joda Time was limited to very few classes. It was intransparent what cases would be handled. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6110dc3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6110dc3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6110dc3 Branch: refs/heads/master Commit: b6110dc35a17340653a39209038041a5e28054b4 Parents: c4bc47a Author: Stephan Ewen <se...@apache.org> Authored: Fri Jan 29 18:51:03 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 1 14:46:06 2016 +0100 ---------------------------------------------------------------------- flink-java/pom.xml | 30 +++++++------- .../flink/api/java/ExecutionEnvironment.java | 26 ++++++++---- .../typeutils/runtime/kryo/Serializers.java | 42 +++----------------- .../api/operators/TimestampedCollector.java | 12 +++--- .../runtime/KryoGenericTypeSerializerTest.scala | 37 +++++++---------- .../api/scala/runtime/TupleSerializerTest.scala | 29 ++++---------- 6 files changed, 68 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 8383a4a..a31e89d 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -70,22 +70,6 @@ under the License. </dependency> <dependency> - <groupId>de.javakaffee</groupId> - <artifactId>kryo-serializers</artifactId> - <version>0.27</version> - </dependency> - - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - </dependency> - - <dependency> - <groupId>org.joda</groupId> - <artifactId>joda-convert</artifactId> - </dependency> - - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> @@ -104,6 +88,20 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.5</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.joda</groupId> + <artifactId>joda-convert</artifactId> + <version>1.7</version> + <scope>test</scope> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 10cb5e3..253ffa3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -18,9 +18,10 @@ package org.apache.flink.api.java; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; +import com.esotericsoftware.kryo.Serializer; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -46,7 +47,10 @@ import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.operators.OperatorTranslation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.*; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -54,14 +58,22 @@ import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; import org.apache.flink.util.SplittableIterator; import org.apache.flink.util.Visitor; + import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.esotericsoftware.kryo.Serializer; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; /** * The ExecutionEnvironment is the context in which a program is executed. A http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java index 6903d35..8bac729 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java @@ -23,13 +23,9 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CollectionSerializer; - -import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer; -import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.specific.SpecificRecordBase; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; @@ -37,11 +33,13 @@ import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.joda.time.DateTime; -import org.joda.time.Interval; import java.io.Serializable; -import java.lang.reflect.*; +import java.lang.reflect.Field; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.Modifier; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -165,36 +163,6 @@ public class Serializers { // ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(avroType); // reg.registerTypeWithKryoSerializer(avroType, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); } - - /** - * Currently, the following classes of JodaTime are supported: - * - DateTime - * - Interval - * - * The following chronologies are supported: (see {@link de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer}) - * <ul> - * <li>{@link org.joda.time.chrono.ISOChronology}</li> - * <li>{@link org.joda.time.chrono.CopticChronology}</li> - * <li>{@link org.joda.time.chrono.EthiopicChronology}</li> - * <li>{@link org.joda.time.chrono.GregorianChronology}</li> - * <li>{@link org.joda.time.chrono.JulianChronology}</li> - * <li>{@link org.joda.time.chrono.IslamicChronology}</li> - * <li>{@link org.joda.time.chrono.BuddhistChronology}</li> - * <li>{@link org.joda.time.chrono.GJChronology}</li> - * </ul> - */ - public static void registerJodaTime(ExecutionConfig reg) { - reg.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class); - reg.registerTypeWithKryoSerializer(Interval.class, JodaIntervalSerializer.class); - } - - /** - * Register less frequently used serializers - */ - public static void registerJavaUtils(ExecutionConfig reg) { - // BitSet, Regex is already present through twitter-chill. - } - // -------------------------------------------------------------------------------------------- // Custom Serializers http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java index 62514fc..5af5109 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; -import org.joda.time.Instant; /** * Wrapper around an {@link Output} for user functions that expect a {@link Collector}. * Before giving the {@link TimestampedCollector} to a user function you must set - * the {@link Instant timestamp} that should be attached to emitted elements. Most operators - * would set the {@link Instant timestamp} of the incoming {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here. + * the timestamp that should be attached to emitted elements. Most operators + * would set the timestamp of the incoming + * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here. * * @param <T> The type of the elments that can be emitted. */ @@ -52,8 +53,9 @@ public class TimestampedCollector<T> implements Collector<T> { } /** - * Sets the {@link Instant timestamp} that is attached to elements that get emitted using - * {@link #collect} + * Sets the timestamp (long milliseconds) that is attached to elements that get emitted using + * {@link #collect(Object)} + * * @param timestamp The timestamp in milliseconds */ public void setTimestamp(long timestamp) { http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala index 859ad2d..08a0a96 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala @@ -17,17 +17,18 @@ */ package org.apache.flink.api.scala.runtime +import com.esotericsoftware.kryo.{Kryo, Serializer} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.SerializerTestInstance import org.apache.flink.api.java.typeutils.GenericTypeInfo -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers + +import org.joda.time.LocalDate + import org.junit.Test + import scala.reflect._ -import org.joda.time.{DateTime, LocalDate} -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.io.Output -import com.esotericsoftware.kryo.io.Input class KryoGenericTypeSerializerTest { @@ -84,56 +85,49 @@ class KryoGenericTypeSerializerTest { } @Test - def testThrowableSerialization: Unit = { + def testThrowableSerialization(): Unit = { val a = List(new RuntimeException("Hello"), new RuntimeException("there")) runTests(a) } @Test - def jodaSerialization: Unit = { - val a = List(new DateTime(1), new DateTime(2)) - - runTests(a) - } - - @Test - def jodaSerialization1: Unit = { + def jodaSerialization(): Unit = { val a = List(new LocalDate(1), new LocalDate(2)) runTests(a) } @Test - def testScalaListSerialization: Unit = { + def testScalaListSerialization(): Unit = { val a = List(42,1,49,1337) runTests(a) } @Test - def testScalaMutablelistSerialization: Unit = { + def testScalaMutablelistSerialization(): Unit = { val a = scala.collection.mutable.ListBuffer(42,1,49,1337) runTests(a) } @Test - def testScalaMapSerialization: Unit = { + def testScalaMapSerialization(): Unit = { val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337)) runTests(Seq(a)) } @Test - def testMutableMapSerialization: Unit ={ + def testMutableMapSerialization(): Unit ={ val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3")) runTests(Seq(a)) } @Test - def testScalaListComplexTypeSerialization: Unit = { + def testScalaListComplexTypeSerialization(): Unit = { val a = ComplexType("1234", 42, List(1,2,3,4)) val b = ComplexType("4321", 24, List(4,3,2,1)) val c = ComplexType("1337", 1, List(1)) @@ -143,7 +137,7 @@ class KryoGenericTypeSerializerTest { } @Test - def testHeterogenousScalaList: Unit = { + def testHeterogenousScalaList(): Unit = { val a = new DerivedType("foo", "bar") val b = new BaseType("foobar") val c = new DerivedType2("bar", "foo") @@ -201,7 +195,6 @@ class KryoGenericTypeSerializerTest { // Register the custom Kryo Serializer val conf = new ExecutionConfig conf.registerTypeWithKryoSerializer(classOf[LocalDate], classOf[LocalDateSerializer]) - Serializers.registerJodaTime(conf) val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]]) val serializer = typeInfo.createSerializer(conf) val typeClass = typeInfo.getTypeClass http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala index c436d62..368204b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala @@ -18,20 +18,21 @@ package org.apache.flink.api.scala.runtime import java.util +import java.util.Random + +import org.apache.flink.api.scala._ import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.runtime.kryo.{Serializers, KryoSerializer} import org.apache.flink.util.StringUtils -import org.joda.time.DateTime + import org.joda.time.LocalDate + import org.junit.Assert import org.junit.Test -import org.apache.flink.api.scala._ + import scala.collection.JavaConverters._ -import java.util.Random class TupleSerializerTest { @@ -98,20 +99,6 @@ class TupleSerializerTest { val rnd: Random = new Random(807346528946L) val testTuples = Array( - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)), - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)), - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)), - ("", rnd.nextDouble), - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)), - (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt))) - runTests(testTuples) - } - - @Test - def testTuple2StringJodaTime2(): Unit = { - val rnd: Random = new Random(807346528946L) - - val testTuples = Array( (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)), (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)), (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)), @@ -208,9 +195,9 @@ class TupleSerializerTest { private final def runTests[T <: Product : TypeInformation](instances: Array[T]) { try { // Register the custom Kryo Serializer - val conf = new ExecutionConfig + val conf = new ExecutionConfig() conf.registerTypeWithKryoSerializer(classOf[LocalDate], classOf[LocalDateSerializer]) - Serializers.registerJodaTime(conf) + val tupleTypeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]] val serializer = tupleTypeInfo.createSerializer(conf) val tupleClass = tupleTypeInfo.getTypeClass