[FLINK-3305] [core] Remove limited and inconsistent auto-magic for Joda Time

The auto-magic for Joda Time was limited to very few classes. It was 
intransparent what
cases would be handled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6110dc3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6110dc3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6110dc3

Branch: refs/heads/master
Commit: b6110dc35a17340653a39209038041a5e28054b4
Parents: c4bc47a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 29 18:51:03 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 1 14:46:06 2016 +0100

----------------------------------------------------------------------
 flink-java/pom.xml                              | 30 +++++++-------
 .../flink/api/java/ExecutionEnvironment.java    | 26 ++++++++----
 .../typeutils/runtime/kryo/Serializers.java     | 42 +++-----------------
 .../api/operators/TimestampedCollector.java     | 12 +++---
 .../runtime/KryoGenericTypeSerializerTest.scala | 37 +++++++----------
 .../api/scala/runtime/TupleSerializerTest.scala | 29 ++++----------
 6 files changed, 68 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 8383a4a..a31e89d 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -70,22 +70,6 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>de.javakaffee</groupId>
-                       <artifactId>kryo-serializers</artifactId>
-                       <version>0.27</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>joda-time</groupId>
-                       <artifactId>joda-time</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.joda</groupId>
-                       <artifactId>joda-convert</artifactId>
-               </dependency>
-
-               <dependency>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                        <version>${guava.version}</version>
@@ -104,6 +88,20 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>joda-time</groupId>
+                       <artifactId>joda-time</artifactId>
+                       <version>2.5</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.joda</groupId>
+                       <artifactId>joda-convert</artifactId>
+                       <version>1.7</version>
+                       <scope>test</scope>
+               </dependency>
                
        </dependencies>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 10cb5e3..253ffa3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.api.java;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
+import com.esotericsoftware.kryo.Serializer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -46,7 +47,10 @@ import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.operators.OperatorTranslation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.*;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -54,14 +58,22 @@ import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
 import org.apache.flink.util.SplittableIterator;
 import org.apache.flink.util.Visitor;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.kryo.Serializer;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * The ExecutionEnvironment is the context in which a program is executed. A

http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index 6903d35..8bac729 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -23,13 +23,9 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-
-import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;
-import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificRecordBase;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
@@ -37,11 +33,13 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
 
 import java.io.Serializable;
-import java.lang.reflect.*;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -165,36 +163,6 @@ public class Serializers {
        //      ClassTag<SpecificRecordBase> tag = 
scala.reflect.ClassTag$.MODULE$.apply(avroType);
        //      reg.registerTypeWithKryoSerializer(avroType, 
com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
        }
-       
-       /**
-        * Currently, the following classes of JodaTime are supported:
-        *      - DateTime
-        *      - Interval
-        *
-        *      The following chronologies are supported: (see {@link 
de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer})
-        * <ul>
-        * <li>{@link org.joda.time.chrono.ISOChronology}</li>
-        * <li>{@link org.joda.time.chrono.CopticChronology}</li>
-        * <li>{@link org.joda.time.chrono.EthiopicChronology}</li>
-        * <li>{@link org.joda.time.chrono.GregorianChronology}</li>
-        * <li>{@link org.joda.time.chrono.JulianChronology}</li>
-        * <li>{@link org.joda.time.chrono.IslamicChronology}</li>
-        * <li>{@link org.joda.time.chrono.BuddhistChronology}</li>
-        * <li>{@link org.joda.time.chrono.GJChronology}</li>
-        * </ul>
-        */
-       public static void registerJodaTime(ExecutionConfig reg) {
-               reg.registerTypeWithKryoSerializer(DateTime.class, 
JodaDateTimeSerializer.class);
-               reg.registerTypeWithKryoSerializer(Interval.class, 
JodaIntervalSerializer.class);
-       }
-       
-       /**
-        * Register less frequently used serializers
-        */
-       public static void registerJavaUtils(ExecutionConfig reg) {
-               // BitSet, Regex is already present through twitter-chill.
-       }
-
 
        // 
--------------------------------------------------------------------------------------------
        // Custom Serializers

http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
index 62514fc..5af5109 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 
 /**
  * Wrapper around an {@link Output} for user functions that expect a {@link 
Collector}.
  * Before giving the {@link TimestampedCollector} to a user function you must 
set
- * the {@link Instant timestamp} that should be attached to emitted elements. 
Most operators
- * would set the {@link Instant timestamp} of the incoming {@link 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
+ * the timestamp that should be attached to emitted elements. Most operators
+ * would set the timestamp of the incoming
+ * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
  *
  * @param <T> The type of the elments that can be emitted.
  */
@@ -52,8 +53,9 @@ public class TimestampedCollector<T> implements Collector<T> {
        }
 
        /**
-        * Sets the {@link Instant timestamp} that is attached to elements that 
get emitted using
-        * {@link #collect}
+        * Sets the timestamp (long milliseconds) that is attached to elements 
that get emitted using
+        * {@link #collect(Object)}
+        * 
         * @param timestamp The timestamp in milliseconds
         */
        public void setTimestamp(long timestamp) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index 859ad2d..08a0a96 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -17,17 +17,18 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import com.esotericsoftware.kryo.{Kryo, Serializer}
+import com.esotericsoftware.kryo.io.{Input, Output}
+
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.SerializerTestInstance
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers
+
+import org.joda.time.LocalDate
+
 import org.junit.Test
+
 import scala.reflect._
-import org.joda.time.{DateTime, LocalDate}
-import com.esotericsoftware.kryo.Serializer
-import com.esotericsoftware.kryo.Kryo
-import com.esotericsoftware.kryo.io.Output
-import com.esotericsoftware.kryo.io.Input
 
 class KryoGenericTypeSerializerTest {
 
@@ -84,56 +85,49 @@ class KryoGenericTypeSerializerTest {
   }
 
   @Test
-  def testThrowableSerialization: Unit = {
+  def testThrowableSerialization(): Unit = {
     val a = List(new RuntimeException("Hello"), new RuntimeException("there"))
 
     runTests(a)
   }
 
   @Test
-  def jodaSerialization: Unit = {
-    val a = List(new DateTime(1), new DateTime(2))
-
-    runTests(a)
-  }
-
-  @Test
-  def jodaSerialization1: Unit = {
+  def jodaSerialization(): Unit = {
     val a = List(new LocalDate(1), new LocalDate(2))
     
     runTests(a)
   }
 
   @Test
-  def testScalaListSerialization: Unit = {
+  def testScalaListSerialization(): Unit = {
     val a = List(42,1,49,1337)
 
     runTests(a)
   }
 
   @Test
-  def testScalaMutablelistSerialization: Unit = {
+  def testScalaMutablelistSerialization(): Unit = {
     val a = scala.collection.mutable.ListBuffer(42,1,49,1337)
 
     runTests(a)
   }
 
   @Test
-  def testScalaMapSerialization: Unit = {
+  def testScalaMapSerialization(): Unit = {
     val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337))
 
     runTests(Seq(a))
   }
 
   @Test
-  def testMutableMapSerialization: Unit ={
+  def testMutableMapSerialization(): Unit ={
     val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3"))
 
     runTests(Seq(a))
   }
 
   @Test
-  def testScalaListComplexTypeSerialization: Unit = {
+  def testScalaListComplexTypeSerialization(): Unit = {
     val a = ComplexType("1234", 42, List(1,2,3,4))
     val b = ComplexType("4321", 24, List(4,3,2,1))
     val c = ComplexType("1337", 1, List(1))
@@ -143,7 +137,7 @@ class KryoGenericTypeSerializerTest {
   }
 
   @Test
-  def testHeterogenousScalaList: Unit = {
+  def testHeterogenousScalaList(): Unit = {
     val a = new DerivedType("foo", "bar")
     val b = new BaseType("foobar")
     val c = new DerivedType2("bar", "foo")
@@ -201,7 +195,6 @@ class KryoGenericTypeSerializerTest {
     // Register the custom Kryo Serializer
     val conf = new ExecutionConfig
     conf.registerTypeWithKryoSerializer(classOf[LocalDate], 
classOf[LocalDateSerializer])
-    Serializers.registerJodaTime(conf)
     val typeInfo = new 
GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
     val serializer = typeInfo.createSerializer(conf)
     val typeClass = typeInfo.getTypeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index c436d62..368204b 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -18,20 +18,21 @@
 package org.apache.flink.api.scala.runtime
 
 import java.util
+import java.util.Random
+
+import org.apache.flink.api.scala._
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.java.ExecutionEnvironment
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.runtime.kryo.{Serializers, 
KryoSerializer}
 import org.apache.flink.util.StringUtils
-import org.joda.time.DateTime
+
 import org.joda.time.LocalDate
+
 import org.junit.Assert
 import org.junit.Test
-import org.apache.flink.api.scala._
+
 import scala.collection.JavaConverters._
-import java.util.Random
 
 class TupleSerializerTest {
 
@@ -98,20 +99,6 @@ class TupleSerializerTest {
     val rnd: Random = new Random(807346528946L)
 
     val testTuples = Array(
-      (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)),
-      (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)),
-      (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)),
-      ("", rnd.nextDouble),
-      (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)),
-      (StringUtils.getRandomString(rnd, 10, 100), new DateTime(rnd.nextInt)))
-    runTests(testTuples)
-  }
-
-  @Test
-  def testTuple2StringJodaTime2(): Unit = {
-    val rnd: Random = new Random(807346528946L)
-
-    val testTuples = Array(
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
@@ -208,9 +195,9 @@ class TupleSerializerTest {
   private final def runTests[T <: Product : TypeInformation](instances: 
Array[T]) {
     try {
       // Register the custom Kryo Serializer
-      val conf = new ExecutionConfig
+      val conf = new ExecutionConfig()
       conf.registerTypeWithKryoSerializer(classOf[LocalDate], 
classOf[LocalDateSerializer])
-      Serializers.registerJodaTime(conf)
+      
       val tupleTypeInfo = 
implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]]
       val serializer = tupleTypeInfo.createSerializer(conf)
       val tupleClass = tupleTypeInfo.getTypeClass

Reply via email to