http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
deleted file mode 100644
index 8cdee9b..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorISD1Test extends 
TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(5, "hello", 23.2),
-               new Tuple3<Integer, String, Double>(6, "world", 20.0),
-               new Tuple3<Integer, String, Double>(7, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(8, "hello", 23.2),
-               new Tuple3<Integer, String, Double>(9, "world", 20.0),
-               new Tuple3<Integer, String, Double>(10, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(11, "hello", 23.2)
-       };
-
-       @Override
-       protected TupleComparator<Tuple3<Integer, String, Double>> 
createComparator(boolean ascending) {
-               return new TupleComparator<Tuple3<Integer, String, Double>>(
-                               new int[]{0},
-                               new TypeComparator[]{ new 
IntComparator(ascending) },
-                               new TypeSerializer[]{ IntSerializer.INSTANCE });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Integer, String, Double>> 
createSerializer() {
-               return new TupleSerializer<Tuple3<Integer, String, Double>>(
-                               (Class<Tuple3<Integer, String, Double>>) 
(Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new IntSerializer(),
-                                       new StringSerializer(),
-                                       new DoubleSerializer()});
-       }
-
-       @Override
-       protected Tuple3<Integer, String, Double>[] getSortedTestData() {
-               return dataISD;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
deleted file mode 100644
index 06c292f..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorISD2Test extends 
TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(4, "world", 23.2),
-               new Tuple3<Integer, String, Double>(5, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(5, "world", 20.0),
-               new Tuple3<Integer, String, Double>(6, "hello", 23.2),
-               new Tuple3<Integer, String, Double>(6, "world", 20.0),
-               new Tuple3<Integer, String, Double>(7, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(7, "world", 23.2)
-       };
-
-       @Override
-       protected TupleComparator<Tuple3<Integer, String, Double>> 
createComparator(boolean ascending) {
-               return new TupleComparator<Tuple3<Integer, String, Double>>(
-                               new int[]{0, 1},
-                               new TypeComparator[]{
-                                       new IntComparator(ascending),
-                                       new StringComparator(ascending)
-                               },
-               new TypeSerializer[]{ IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, DoubleSerializer.INSTANCE });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Integer, String, Double>> 
createSerializer() {
-               return new TupleSerializer<Tuple3<Integer, String, Double>>(
-                               (Class<Tuple3<Integer, String, Double>>) 
(Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new IntSerializer(),
-                                       new StringSerializer(),
-                                       new DoubleSerializer()});
-       }
-
-       @Override
-       protected Tuple3<Integer, String, Double>[] getSortedTestData() {
-               return dataISD;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
deleted file mode 100644
index d823a29..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorISD3Test extends 
TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(4, "hello", 23.2),
-               new Tuple3<Integer, String, Double>(4, "world", 20.0),
-               new Tuple3<Integer, String, Double>(5, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(5, "hello", 23.2),
-               new Tuple3<Integer, String, Double>(5, "world", 20.0),
-               new Tuple3<Integer, String, Double>(6, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(6, "hello", 23.2)
-       };
-
-       @Override
-       protected TupleComparator<Tuple3<Integer, String, Double>> 
createComparator(boolean ascending) {
-               return new TupleComparator<Tuple3<Integer, String, Double>>(
-                               new int[]{0, 1, 2},
-                               new TypeComparator[]{
-                                       new IntComparator(ascending),
-                                       new StringComparator(ascending),
-                                       new DoubleComparator(ascending)
-                               },
-               new TypeSerializer[]{ IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, DoubleSerializer.INSTANCE });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Integer, String, Double>> 
createSerializer() {
-               return new TupleSerializer<Tuple3<Integer, String, Double>>(
-                               (Class<Tuple3<Integer, String, Double>>) 
(Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new IntSerializer(),
-                                       new StringSerializer(),
-                                       new DoubleSerializer()});
-       }
-
-       @Override
-       protected Tuple3<Integer, String, Double>[] getSortedTestData() {
-               return dataISD;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
deleted file mode 100644
index cf73be2..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertEquals;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorTTT1Test extends 
TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, 
Long>>[] dataISD = new Tuple3[]{
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new 
Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new 
Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new 
Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new 
Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new 
Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new 
Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new 
Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new 
Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new 
Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new 
Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
-               
-       };
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createComparator(
-                       boolean ascending) {
-               return new TupleComparator<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-                               new int[] { 0 },
-                               new TypeComparator[] {
-                                               new 
TupleComparator<Tuple2<String, Double>>(
-                                                               new int[] { 0, 
1 },
-                                                               new 
TypeComparator[] {
-                                                               new 
StringComparator(ascending),
-                                                               new 
DoubleComparator(ascending) },
-                                                               new 
TypeSerializer[] {
-                                                                               
StringSerializer.INSTANCE,
-                                                                               
DoubleSerializer.INSTANCE }) },
-                               new TypeSerializer[] {
-                                               new 
TupleSerializer<Tuple2<String, Double>>(
-                                                               
(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
StringSerializer.INSTANCE,
-                                                                               
DoubleSerializer.INSTANCE }),
-                                               new 
TupleSerializer<Tuple2<Long, Long>>(
-                                                               
(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
LongSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }),
-                                               new 
TupleSerializer<Tuple2<Integer, Long>>(
-                                                               
(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
IntSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }) });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createSerializer() {
-               return new  TupleSerializer<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-                               (Class<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new TupleSerializer<Tuple2<String, 
Double>> (
-                                                       (Class<Tuple2<String, 
Double>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
StringSerializer.INSTANCE,
-                                                                       
DoubleSerializer.INSTANCE
-                                       }),
-                                       new TupleSerializer<Tuple2<Long, Long>> 
(
-                                                       (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
LongSerializer.INSTANCE,
-                                                                       
LongSerializer.INSTANCE
-                                       }),
-                                       new TupleSerializer<Tuple2<Integer, 
Long>> (
-                                                       (Class<Tuple2<Integer, 
Long>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
IntSerializer.INSTANCE,
-                                                                       
LongSerializer.INSTANCE
-                                       })
-                               });
-       }
-
-       @Override
-       protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>[] getSortedTestData() {
-               return this.dataISD;
-       }
-       
-       @Override
-       protected void deepEquals(
-                       String message,
-                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> should,
-                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> is) {
-               
-               for (int x = 0; x < should.getArity(); x++) {
-                       // Check whether field is of type Tuple2 because 
assertEquals must be called on the non Tuple2 fields.
-                       if(should.getField(x) instanceof Tuple2) {
-                               this.deepEquals(message, (Tuple2<?,?>) 
should.getField(x), (Tuple2<?,?>)is.getField(x));
-                       }
-                       else {
-                               assertEquals(message, should.getField(x), 
is.getField(x));
-                       }
-               }// For
-       }
-       
-       protected void deepEquals(String message, Tuple2<?,?> should, 
Tuple2<?,?> is) {
-               for (int x = 0; x < should.getArity(); x++) {
-                       assertEquals(message, should.getField(x), 
is.getField(x));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
deleted file mode 100644
index 4b07c61..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorTTT2Test extends 
TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, 
Long>>[] dataISD = new Tuple3[]{
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new 
Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new 
Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new 
Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new 
Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new 
Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new 
Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new 
Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new 
Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new 
Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new 
Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
-               
-       };
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createComparator(
-                       boolean ascending) {
-               return new TupleComparator<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-                               new int[] { 0, 2 },
-                               new TypeComparator[] {
-                                               new 
TupleComparator<Tuple2<String, Double>>(
-                                                               new int[] { 0, 
1 },
-                                                               new 
TypeComparator[] {
-                                                               new 
StringComparator(ascending),
-                                                               new 
DoubleComparator(ascending) },
-                                                               new 
TypeSerializer[] {
-                                                                               
StringSerializer.INSTANCE,
-                                                                               
DoubleSerializer.INSTANCE }),
-                                               new 
TupleComparator<Tuple2<Integer, Long>>(
-                                                               new int[] {     
0, 1 },
-                                                               new 
TypeComparator[] {
-                                                               new 
IntComparator(ascending),
-                                                               new 
LongComparator(ascending) },
-                                                               new 
TypeSerializer[] {
-                                                                               
IntSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }) },
-                               new TypeSerializer[] {
-                                               new 
TupleSerializer<Tuple2<String, Double>>(
-                                                               
(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
StringSerializer.INSTANCE,
-                                                                               
DoubleSerializer.INSTANCE }),
-                                               new 
TupleSerializer<Tuple2<Long, Long>>(
-                                                               
(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
LongSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }),
-                                               new 
TupleSerializer<Tuple2<Integer, Long>>(
-                                                               
(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
IntSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }) });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createSerializer() {
-               return new  TupleSerializer<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-                               (Class<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new TupleSerializer<Tuple2<String, 
Double>> (
-                                                       (Class<Tuple2<String, 
Double>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
StringSerializer.INSTANCE,
-                                                                       
DoubleSerializer.INSTANCE}),
-                                       new TupleSerializer<Tuple2<Long, Long>> 
(
-                                                       (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
LongSerializer.INSTANCE,
-                                                                       
LongSerializer.INSTANCE}),
-                                       new TupleSerializer<Tuple2<Integer, 
Long>> (
-                                                       (Class<Tuple2<Integer, 
Long>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
IntSerializer.INSTANCE,
-                                                                       
LongSerializer.INSTANCE})
-                               });
-       }
-
-       @Override
-       protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>[] getSortedTestData() {
-               return this.dataISD;
-       }
-       
-       @Override
-       protected void deepEquals(
-                       String message,
-                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> should,
-                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> is) {
-               
-               for (int x = 0; x < should.getArity(); x++) {
-                       // Check whether field is of type Tuple2 because 
assertEquals must be called on the non Tuple2 fields.
-                       if(should.getField(x) instanceof Tuple2) {
-                               this.deepEquals(message, (Tuple2<?,?>) 
should.getField(x), (Tuple2<?,?>)is.getField(x));
-                       }
-                       else {
-                               assertEquals(message, should.getField(x), 
is.getField(x));
-                       }
-               }// For
-       }
-       
-       protected void deepEquals(String message, Tuple2<?,?> should, 
Tuple2<?,?> is) {
-               for (int x = 0; x < should.getArity(); x++) {
-                       assertEquals(message, should.getField(x), 
is.getField(x));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
deleted file mode 100644
index 0dfc094..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertEquals;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorTTT3Test extends 
TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>>{
-       @SuppressWarnings("unchecked")
-       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, 
Long>>[] dataISD = new Tuple3[]{
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new 
Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new 
Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new 
Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new 
Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new 
Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new 
Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new 
Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new 
Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new 
Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
-               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new 
Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
-               
-       };
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createComparator(
-                       boolean ascending) {
-               return new TupleComparator<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-                               new int[] { 0, 1, 2 },
-                               new TypeComparator[] {
-                                               new 
TupleComparator<Tuple2<String, Double>>(
-                                                               new int[] { 0, 
1 },
-                                                               new 
TypeComparator[] {
-                                                               new 
StringComparator(ascending),
-                                                               new 
DoubleComparator(ascending) },
-                                                               new 
TypeSerializer[] {
-                                                                               
StringSerializer.INSTANCE,
-                                                                               
DoubleSerializer.INSTANCE }),
-                                               new 
TupleComparator<Tuple2<Long, Long>>(
-                                                               new int[] { 0, 
1 },
-                                                               new 
TypeComparator[] {
-                                                               new 
LongComparator(ascending),
-                                                               new 
LongComparator(ascending) },
-                                                               new 
TypeSerializer[] {
-                                                                               
LongSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }),
-                                               new 
TupleComparator<Tuple2<Integer, Long>>(
-                                                               new int[] {     
0, 1 },
-                                                               new 
TypeComparator[] {
-                                                               new 
IntComparator(ascending),
-                                                               new 
LongComparator(ascending) },
-                                                               new 
TypeSerializer[] {
-                                                                               
IntSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }) },
-                               new TypeSerializer[] {
-                                               new 
TupleSerializer<Tuple2<String, Double>>(
-                                                               
(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
StringSerializer.INSTANCE,
-                                                                               
DoubleSerializer.INSTANCE }),
-                                               new 
TupleSerializer<Tuple2<Long, Long>>(
-                                                               
(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
LongSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }),
-                                               new 
TupleSerializer<Tuple2<Integer, Long>>(
-                                                               
(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-                                                               new 
TypeSerializer[] {
-                                                                               
IntSerializer.INSTANCE,
-                                                                               
LongSerializer.INSTANCE }) });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createSerializer() {
-               return new  TupleSerializer<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-                               (Class<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new TupleSerializer<Tuple2<String, 
Double>> (
-                                                       (Class<Tuple2<String, 
Double>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
StringSerializer.INSTANCE,
-                                                                       
DoubleSerializer.INSTANCE
-                                       }),
-                                       new TupleSerializer<Tuple2<Long, Long>> 
(
-                                                       (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
LongSerializer.INSTANCE,
-                                                                       
LongSerializer.INSTANCE
-                                       }),
-                                       new TupleSerializer<Tuple2<Integer, 
Long>> (
-                                                       (Class<Tuple2<Integer, 
Long>>) (Class<?>) Tuple2.class,
-                                                       new TypeSerializer[]{
-                                                                       
IntSerializer.INSTANCE,
-                                                                       
LongSerializer.INSTANCE
-                                       })
-                               });
-       }
-
-       @Override
-       protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>[] getSortedTestData() {
-               return this.dataISD;
-       }
-       
-       @Override
-       protected void deepEquals(
-                       String message,
-                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> should,
-                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> is) {
-               
-               for (int x = 0; x < should.getArity(); x++) {
-                       // Check whether field is of type Tuple2 because 
assertEquals must be called on the non Tuple2 fields.
-                       if(should.getField(x) instanceof Tuple2) {
-                               this.deepEquals(message, (Tuple2<?,?>) 
should.getField(x), (Tuple2<?,?>)is.getField(x));
-                       }
-                       else {
-                               assertEquals(message, should.getField(x), 
is.getField(x));
-                       }
-               }// For
-       }
-       
-       protected void deepEquals(String message, Tuple2<?,?> should, 
Tuple2<?,?> is) {
-               for (int x = 0; x < should.getArity(); x++) {
-                       assertEquals(message, should.getField(x), 
is.getField(x));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
deleted file mode 100644
index 017eb44..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.util.ArrayList;
-import java.util.Random;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
-import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book;
-import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor;
-import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1;
-import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2;
-import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.util.StringUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TupleSerializerTest {
-
-       @Test
-       public void testTuple0() {
-               Tuple0[] testTuples = new Tuple0[] { Tuple0.INSTANCE, 
Tuple0.INSTANCE, Tuple0.INSTANCE };
-
-               runTests(testTuples);
-       }
-
-       @Test
-       public void testTuple1Int() {
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               Tuple1<Integer>[] testTuples = new Tuple1[] {
-                       new Tuple1<Integer>(42), new Tuple1<Integer>(1), new 
Tuple1<Integer>(0), new Tuple1<Integer>(-1),
-                       new Tuple1<Integer>(Integer.MAX_VALUE), new 
Tuple1<Integer>(Integer.MIN_VALUE)
-               };
-               
-               runTests(testTuples);
-       }
-       
-       @Test
-       public void testTuple1String() {
-               Random rnd = new Random(68761564135413L);
-               
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               Tuple1<String>[] testTuples = new Tuple1[] {
-                       new Tuple1<String>(StringUtils.getRandomString(rnd, 10, 
100)),
-                       new Tuple1<String>("abc"),
-                       new Tuple1<String>(""),
-                       new Tuple1<String>(StringUtils.getRandomString(rnd, 30, 
170)),
-                       new Tuple1<String>(StringUtils.getRandomString(rnd, 15, 
50)),
-                       new Tuple1<String>("")
-               };
-               
-               runTests(testTuples);
-       }
-       
-       @Test
-       public void testTuple1StringArray() {
-               Random rnd = new Random(289347567856686223L);
-               
-               String[] arr1 = new String[] {"abc", "",
-                               StringUtils.getRandomString(rnd, 10, 100),
-                               StringUtils.getRandomString(rnd, 15, 50),
-                               StringUtils.getRandomString(rnd, 30, 170),
-                               StringUtils.getRandomString(rnd, 14, 15),
-                               ""};
-               
-               String[] arr2 = new String[] {"foo", "",
-                               StringUtils.getRandomString(rnd, 10, 100),
-                               StringUtils.getRandomString(rnd, 1000, 5000),
-                               StringUtils.getRandomString(rnd, 30000, 35000),
-                               StringUtils.getRandomString(rnd, 100*1024, 
105*1024),
-                               "bar"};
-               
-               @SuppressWarnings("unchecked")
-               Tuple1<String[]>[] testTuples = new Tuple1[] {
-                       new Tuple1<String[]>(arr1),
-                       new Tuple1<String[]>(arr2)
-               };
-               
-               runTests(testTuples);
-       }
-       
-       @Test
-       public void testTuple2StringDouble() {
-               Random rnd = new Random(807346528946L);
-               
-               @SuppressWarnings("unchecked")
-               Tuple2<String, Double>[] testTuples = new Tuple2[] {
-                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
-                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
-                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
-                               new Tuple2<String, Double>("", 
rnd.nextDouble()),
-                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
-                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble())
-                       };
-               
-               runTests(testTuples);
-       }
-       
-       @Test
-       public void testTuple2StringStringArray() {
-               Random rnd = new Random(289347567856686223L);
-               
-               String[] arr1 = new String[] {"abc", "",
-                               StringUtils.getRandomString(rnd, 10, 100),
-                               StringUtils.getRandomString(rnd, 15, 50),
-                               StringUtils.getRandomString(rnd, 30, 170),
-                               StringUtils.getRandomString(rnd, 14, 15),
-                               ""};
-               
-               String[] arr2 = new String[] {"foo", "",
-                               StringUtils.getRandomString(rnd, 10, 100),
-                               StringUtils.getRandomString(rnd, 1000, 5000),
-                               StringUtils.getRandomString(rnd, 30000, 35000),
-                               StringUtils.getRandomString(rnd, 100*1024, 
105*1024),
-                               "bar"};
-               
-               @SuppressWarnings("unchecked")
-               Tuple2<String, String[]>[] testTuples = new Tuple2[] {
-                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1),
-                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2),
-                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1),
-                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2),
-                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2)
-               };
-               
-               runTests(testTuples);
-       }
-       
-
-       @Test
-       public void testTuple5CustomObjects() {
-               Random rnd = new Random(807346528946L);
-               
-               SimpleTypes a = new SimpleTypes();
-               SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
-                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
-               SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
-                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
-               SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
-                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
-               SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
-                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
-               SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
-                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
-               SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
-                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
-               
-               ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
-               ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
-               ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
-               ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
-               ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
-               
-               ComplexNestedObject2 co1 = new ComplexNestedObject2(rnd);
-               ComplexNestedObject2 co2 = new ComplexNestedObject2();
-               ComplexNestedObject2 co3 = new ComplexNestedObject2(rnd);
-               ComplexNestedObject2 co4 = new ComplexNestedObject2(rnd);
-               
-               Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
-               Book b2 = new Book(0L, "Debugging byte streams", 1337);
-               Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
-               Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 
0xDEADBEEF);
-               Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for 
creative test strings", 0xBADF00);
-               Book b6 = new Book(-2L, "Distributed Systems", 
0xABCDEF0123456789L);
-               
-               ArrayList<String> list = new ArrayList<String>();
-               list.add("A");
-               list.add("B");
-               list.add("C");
-               list.add("D");
-               list.add("E");
-               
-               BookAuthor ba1 = new BookAuthor(976243875L, list, "Arno Nym");
-               
-               ArrayList<String> list2 = new ArrayList<String>();
-               BookAuthor ba2 = new BookAuthor(987654321L, list2, "The 
Saurus");
-               
-               
-               @SuppressWarnings("unchecked")
-               Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, 
ComplexNestedObject2>[] testTuples = new Tuple5[] {
-                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(a, b1, o1, ba1, co1),
-                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(b, b2, o2, ba2, co2),
-                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(c, b3, o3, ba1, co3),
-                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(d, b2, o4, ba1, co4),
-                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(e, b4, o5, ba2, co4),
-                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(f, b5, o1, ba2, co4),
-                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(g, b6, o4, ba1, co2)
-               };
-               
-               runTests(testTuples);
-       }
-
-       private <T extends Tuple> void runTests(T... instances) {
-               try {
-                       TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) 
TypeExtractor.getForObject(instances[0]);
-                       TypeSerializer<T> serializer = 
tupleTypeInfo.createSerializer(new ExecutionConfig());
-                       
-                       Class<T> tupleClass = tupleTypeInfo.getTypeClass();
-                       
-                       int length = -1;
-                       if(tupleClass == Tuple0.class) {
-                               length = 1;
-                       }
-                       TupleSerializerTestInstance<T> test = new 
TupleSerializerTestInstance<T>(serializer, tupleClass, length, instances);
-                       test.testAll();
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
deleted file mode 100644
index a196984..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.junit.Assert;
-
-public class TupleSerializerTestInstance<T extends Tuple> extends 
SerializerTestInstance<T> {
-
-       public TupleSerializerTestInstance(TypeSerializer<T> serializer, 
Class<T> typeClass, int length, T[] testData) {
-               super(serializer, typeClass, length, testData);
-       }
-       
-       protected void deepEquals(String message, T shouldTuple, T isTuple) {
-               Assert.assertEquals(shouldTuple.getArity(), isTuple.getArity());
-               
-               for (int i = 0; i < shouldTuple.getArity(); i++) {
-                       Object should = shouldTuple.getField(i);
-                       Object is = isTuple.getField(i);
-                       
-                       if (should.getClass().isArray()) {
-                               if (should instanceof boolean[]) {
-                                       Assert.assertTrue(message, 
Arrays.equals((boolean[]) should, (boolean[]) is));
-                               }
-                               else if (should instanceof byte[]) {
-                                       assertArrayEquals(message, (byte[]) 
should, (byte[]) is);
-                               }
-                               else if (should instanceof short[]) {
-                                       assertArrayEquals(message, (short[]) 
should, (short[]) is);
-                               }
-                               else if (should instanceof int[]) {
-                                       assertArrayEquals(message, (int[]) 
should, (int[]) is);
-                               }
-                               else if (should instanceof long[]) {
-                                       assertArrayEquals(message, (long[]) 
should, (long[]) is);
-                               }
-                               else if (should instanceof float[]) {
-                                       assertArrayEquals(message, (float[]) 
should, (float[]) is, 0.0f);
-                               }
-                               else if (should instanceof double[]) {
-                                       assertArrayEquals(message, (double[]) 
should, (double[]) is, 0.0);
-                               }
-                               else if (should instanceof char[]) {
-                                       assertArrayEquals(message, (char[]) 
should, (char[]) is);
-                               }
-                               else {
-                                       assertArrayEquals(message, (Object[]) 
should, (Object[]) is);
-                               }
-                       }
-                       else {
-                               assertEquals(message,  should, is);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
deleted file mode 100644
index cf9874d..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.types.StringValue;
-
-public class ValueComparatorTest extends ComparatorTestBase<StringValue> {
-
-       StringValue[] data = new StringValue[]{
-               new StringValue(""),
-               new StringValue("Lorem Ipsum Dolor Omit Longer"),
-               new StringValue("aaaa"),
-               new StringValue("abcd"),
-               new StringValue("abce"),
-               new StringValue("abdd"),
-               new StringValue("accd"),
-               new StringValue("bbcd")
-       };
-
-       @Override
-       protected TypeComparator<StringValue> createComparator(boolean 
ascending) {
-               return new ValueComparator<StringValue>(ascending, 
StringValue.class);
-       }
-
-       @Override
-       protected TypeSerializer<StringValue> createSerializer() {
-               return new ValueSerializer<StringValue>(StringValue.class);
-       }
-
-       @Override
-       protected StringValue[] getSortedTestData() {
-               return data;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
deleted file mode 100644
index a2c3e1e..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class ValueComparatorUUIDTest extends ComparatorTestBase<ValueID> {
-       @Override
-       protected TypeComparator<ValueID> createComparator(boolean ascending) {
-               return new ValueComparator<>(ascending, ValueID.class);
-       }
-
-       @Override
-       protected TypeSerializer<ValueID> createSerializer() {
-               return new ValueSerializer<>(ValueID.class);
-       }
-
-       @Override
-       protected ValueID[] getSortedTestData() {
-               return new ValueID[] {
-                       new ValueID(new UUID(0, 0)),
-                       new ValueID(new UUID(1, 0)),
-                       new ValueID(new UUID(1, 1))
-               };
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
deleted file mode 100644
index d644485..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-import java.io.IOException;
-import java.util.UUID;
-
-public class ValueID implements Value, Comparable<ValueID> {
-       private static final long serialVersionUID = -562791433077971752L;
-
-       private UUID id;
-
-       public ValueID() {
-               id = UUID.randomUUID();
-       }
-
-       public ValueID(UUID id) {
-               this.id = id;
-       }
-
-       @Override
-       public int compareTo(ValueID o) {
-               return id.compareTo(o.id);
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeLong(id.getMostSignificantBits());
-               out.writeLong(id.getLeastSignificantBits());
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               id = new UUID(in.readLong(), in.readLong());
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof ValueID) {
-                       ValueID other = (ValueID) obj;
-
-                       return id.equals(other.id);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return id.hashCode();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
deleted file mode 100644
index 9c07a5e..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class ValueSerializerUUIDTest extends SerializerTestBase<ValueID> {
-       @Override
-       protected TypeSerializer<ValueID> createSerializer() {
-               return new ValueSerializer<>(ValueID.class);
-       }
-
-       @Override
-       protected int getLength() {
-               return -1;
-       }
-
-       @Override
-       protected Class<ValueID> getTypeClass() {
-               return ValueID.class;
-       }
-
-       @Override
-       protected ValueID[] getTestData() {
-               return new ValueID[] {
-                       new ValueID(new UUID(0, 0)),
-                       new ValueID(new UUID(1, 0)),
-                       new ValueID(new UUID(1, 1))
-               };
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
deleted file mode 100644
index f5a90b7..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class WritableComparatorTest extends 
ComparatorTestBase<StringArrayWritable> {  
-       
-       StringArrayWritable[] data = new StringArrayWritable[]{
-                       new StringArrayWritable(new String[]{}),
-                       new StringArrayWritable(new String[]{""}),
-                       new StringArrayWritable(new String[]{"a","a"}),
-                       new StringArrayWritable(new String[]{"a","b"}),
-                       new StringArrayWritable(new String[]{"c","c"}),
-                       new StringArrayWritable(new String[]{"d","f"}),
-                       new StringArrayWritable(new String[]{"d","m"}),
-                       new StringArrayWritable(new String[]{"z","x"}),
-                       new StringArrayWritable(new String[]{"a","a", "a"})
-       };
-       
-       @Override
-       protected TypeComparator<StringArrayWritable> createComparator(boolean 
ascending) {
-               return new WritableComparator<StringArrayWritable>(ascending, 
StringArrayWritable.class);
-       }
-       
-       @Override
-       protected TypeSerializer<StringArrayWritable> createSerializer() {
-               return new 
WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
-       }
-       
-       @Override
-       protected StringArrayWritable[] getSortedTestData() {
-               return data;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
deleted file mode 100644
index 94e759d..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> 
{
-       @Override
-       protected TypeComparator<WritableID> createComparator(boolean 
ascending) {
-               return new WritableComparator<>(ascending, WritableID.class);
-       }
-
-       @Override
-       protected TypeSerializer<WritableID> createSerializer() {
-               return new WritableSerializer<>(WritableID.class);
-       }
-
-       @Override
-       protected WritableID[] getSortedTestData() {
-               return new WritableID[] {
-                       new WritableID(new UUID(0, 0)),
-                       new WritableID(new UUID(1, 0)),
-                       new WritableID(new UUID(1, 1))
-               };
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
deleted file mode 100644
index 4274cf6..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.UUID;
-
-public class WritableID implements WritableComparable<WritableID> {
-       private UUID uuid;
-
-       public WritableID() {
-               this.uuid = UUID.randomUUID();
-       }
-
-       public WritableID(UUID uuid) {
-               this.uuid = uuid;
-       }
-
-       @Override
-       public int compareTo(WritableID o) {
-               return this.uuid.compareTo(o.uuid);
-       }
-
-       @Override
-       public void write(DataOutput dataOutput) throws IOException {
-               dataOutput.writeLong(uuid.getMostSignificantBits());
-               dataOutput.writeLong(uuid.getLeastSignificantBits());
-       }
-
-       @Override
-       public void readFields(DataInput dataInput) throws IOException {
-               this.uuid = new UUID(dataInput.readLong(), 
dataInput.readLong());
-       }
-
-       @Override
-       public String toString() {
-               return uuid.toString();
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               WritableID id = (WritableID) o;
-
-               return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != 
null);
-       }
-
-       @Override
-       public int hashCode() {
-               return uuid != null ? uuid.hashCode() : 0;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
deleted file mode 100644
index bb5f4d4..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.junit.Test;
-
-public class WritableSerializerTest {
-       
-       @Test
-       public void testStringArrayWritable() {
-               StringArrayWritable[] data = new StringArrayWritable[]{
-                               new StringArrayWritable(new String[]{}),
-                               new StringArrayWritable(new String[]{""}),
-                               new StringArrayWritable(new String[]{"a","a"}),
-                               new StringArrayWritable(new String[]{"a","b"}),
-                               new StringArrayWritable(new String[]{"c","c"}),
-                               new StringArrayWritable(new String[]{"d","f"}),
-                               new StringArrayWritable(new String[]{"d","m"}),
-                               new StringArrayWritable(new String[]{"z","x"}),
-                               new StringArrayWritable(new String[]{"a","a", 
"a"})
-               };
-               
-               WritableTypeInfo<StringArrayWritable> writableTypeInfo = 
(WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
-               WritableSerializer<StringArrayWritable> writableSerializer = 
(WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new 
ExecutionConfig());
-               
-               SerializerTestInstance<StringArrayWritable> testInstance = new 
SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(),
 -1, data);
-               
-               testInstance.testAll();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
deleted file mode 100644
index 2af7730..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> 
{
-       @Override
-       protected TypeSerializer<WritableID> createSerializer() {
-               return new WritableSerializer<>(WritableID.class);
-       }
-
-       @Override
-       protected int getLength() {
-               return -1;
-       }
-
-       @Override
-       protected Class<WritableID> getTypeClass() {
-               return WritableID.class;
-       }
-
-       @Override
-       protected WritableID[] getTestData() {
-               return new WritableID[] {
-                       new WritableID(new UUID(0, 0)),
-                       new WritableID(new UUID(1, 0)),
-                       new WritableID(new UUID(1, 1))
-               };
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
deleted file mode 100644
index 7572408..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-
-public class KryoClearedBufferTest {
-
-       /**
-        * Tests that the kryo output buffer is cleared in case of an 
exception. Flink uses the
-        * EOFException to signal that a buffer is full. In such a case, the 
record which was tried
-        * to be written will be rewritten. Therefore, eventually buffered data 
of this record has
-        * to be cleared.
-        */
-       @Test
-       public void testOutputBufferedBeingClearedInCaseOfException() throws 
Exception {
-               ExecutionConfig executionConfig = new ExecutionConfig();
-               
executionConfig.registerTypeWithKryoSerializer(TestRecord.class, new 
TestRecordSerializer());
-               executionConfig.registerKryoType(TestRecord.class);
-
-               KryoSerializer<TestRecord> kryoSerializer = new 
KryoSerializer<TestRecord>(
-                       TestRecord.class,
-                       executionConfig);
-
-               int size = 94;
-               int bufferSize = 150;
-
-               TestRecord testRecord = new TestRecord(size);
-
-               TestDataOutputView target = new TestDataOutputView(bufferSize);
-
-               kryoSerializer.serialize(testRecord, target);
-
-               try {
-                       kryoSerializer.serialize(testRecord, target);
-                       Assert.fail("Expected an EOFException.");
-               } catch(EOFException eofException) {
-                       // expected exception
-                       // now the Kryo Output should have been cleared
-               }
-
-               TestRecord actualRecord = kryoSerializer.deserialize(
-                               new DataInputViewStreamWrapper(new 
ByteArrayInputStream(target.getBuffer())));
-
-               Assert.assertEquals(testRecord, actualRecord);
-
-               target.clear();
-
-               // if the kryo output has been cleared then we can serialize 
our test record into the target
-               // because the target buffer 150 bytes can host one TestRecord 
(total serialization size 100)
-               kryoSerializer.serialize(testRecord, target);
-
-               byte[] buffer = target.getBuffer();
-               int counter = 0;
-
-               for (int i = 0; i < buffer.length; i++) {
-                       if(buffer[i] == 42) {
-                               counter++;
-                       }
-               }
-
-               Assert.assertEquals(size, counter);
-       }
-
-       public static class TestRecord {
-               private byte[] buffer;
-
-               public TestRecord(int size) {
-                       buffer = new byte[size];
-
-                       Arrays.fill(buffer, (byte)42);
-               }
-
-               public TestRecord(byte[] buffer){
-                       this.buffer = buffer;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj instanceof TestRecord) {
-                               TestRecord record = (TestRecord) obj;
-
-                               return Arrays.equals(buffer, record.buffer);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-
-       public static class TestRecordSerializer extends Serializer<TestRecord> 
implements Serializable {
-
-               private static final long serialVersionUID = 
6971996565421454985L;
-
-               @Override
-               public void write(Kryo kryo, Output output, TestRecord object) {
-                       output.writeInt(object.buffer.length);
-                       output.write(object.buffer);
-               }
-
-               @Override
-               public TestRecord read(Kryo kryo, Input input, 
Class<TestRecord> type) {
-                       int length = input.readInt();
-                       byte[] buffer = input.readBytes(length);
-
-                       return new TestRecord(buffer);
-               }
-       }
-
-       public static class TestDataOutputView implements DataOutputView {
-
-               private byte[] buffer;
-               private int position;
-
-               public TestDataOutputView(int size) {
-                       buffer = new byte[size];
-                       position = 0;
-               }
-
-               public void clear() {
-                       position = 0;
-               }
-
-               public byte[] getBuffer() {
-                       return buffer;
-               }
-
-               public void checkSize(int numBytes) throws EOFException {
-                       if (position + numBytes > buffer.length) {
-                               throw new EOFException();
-                       }
-               }
-
-               @Override
-               public void skipBytesToWrite(int numBytes) throws IOException {
-                       checkSize(numBytes);
-
-                       position += numBytes;
-               }
-
-               @Override
-               public void write(DataInputView source, int numBytes) throws 
IOException {
-                       checkSize(numBytes);
-
-                       byte[] tempBuffer = new byte[numBytes];
-
-                       source.read(tempBuffer);
-
-                       System.arraycopy(tempBuffer, 0, buffer, position, 
numBytes);
-
-                       position += numBytes;
-               }
-
-               @Override
-               public void write(int b) throws IOException {
-                       checkSize(4);
-
-                       position += 4;
-               }
-
-               @Override
-               public void write(byte[] b) throws IOException {
-                       checkSize(b.length);
-
-                       System.arraycopy(b, 0, buffer, position, b.length);
-                       position += b.length;
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) throws 
IOException {
-                       checkSize(len);
-
-                       System.arraycopy(b, off, buffer, position, len);
-
-                       position += len;
-               }
-
-               @Override
-               public void writeBoolean(boolean v) throws IOException {
-                       checkSize(1);
-                       position += 1;
-               }
-
-               @Override
-               public void writeByte(int v) throws IOException {
-                       checkSize(1);
-
-                       buffer[position] = (byte)v;
-
-                       position++;
-               }
-
-               @Override
-               public void writeShort(int v) throws IOException {
-                       checkSize(2);
-
-                       position += 2;
-               }
-
-               @Override
-               public void writeChar(int v) throws IOException {
-                       checkSize(1);
-                       position++;
-               }
-
-               @Override
-               public void writeInt(int v) throws IOException {
-                       checkSize(4);
-
-                       position += 4;
-               }
-
-               @Override
-               public void writeLong(long v) throws IOException {
-                       checkSize(8);
-                       position += 8;
-               }
-
-               @Override
-               public void writeFloat(float v) throws IOException {
-                       checkSize(4);
-                       position += 4;
-               }
-
-               @Override
-               public void writeDouble(double v) throws IOException {
-                       checkSize(8);
-                       position += 8;
-               }
-
-               @Override
-               public void writeBytes(String s) throws IOException {
-                       byte[] sBuffer = s.getBytes();
-                       checkSize(sBuffer.length);
-                       System.arraycopy(sBuffer, 0, buffer, position, 
sBuffer.length);
-                       position += sBuffer.length;
-               }
-
-               @Override
-               public void writeChars(String s) throws IOException {
-                       byte[] sBuffer = s.getBytes();
-                       checkSize(sBuffer.length);
-                       System.arraycopy(sBuffer, 0, buffer, position, 
sBuffer.length);
-                       position += sBuffer.length;
-               }
-
-               @Override
-               public void writeUTF(String s) throws IOException {
-                       byte[] sBuffer = s.getBytes();
-                       checkSize(sBuffer.length);
-                       System.arraycopy(sBuffer, 0, buffer, position, 
sBuffer.length);
-                       position += sBuffer.length;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
deleted file mode 100644
index 9d3eef1..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericArraySerializerTest;
-
-public class KryoGenericArraySerializerTest extends 
AbstractGenericArraySerializerTest {
-       @Override
-       protected <T> TypeSerializer<T> createComponentSerializer(Class<T> 
type) {
-               return new KryoSerializer<T>(type, new ExecutionConfig());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
deleted file mode 100644
index 19aed78..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeComparatorTest;
-
-public class KryoGenericTypeComparatorTest extends 
AbstractGenericTypeComparatorTest {
-       @Override
-       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new KryoSerializer<T>(type, new ExecutionConfig());
-       }
-}
\ No newline at end of file

Reply via email to