http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
new file mode 100644
index 0000000..a196984
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.junit.Assert;
+
+public class TupleSerializerTestInstance<T extends Tuple> extends 
SerializerTestInstance<T> {
+
+       public TupleSerializerTestInstance(TypeSerializer<T> serializer, 
Class<T> typeClass, int length, T[] testData) {
+               super(serializer, typeClass, length, testData);
+       }
+       
+       protected void deepEquals(String message, T shouldTuple, T isTuple) {
+               Assert.assertEquals(shouldTuple.getArity(), isTuple.getArity());
+               
+               for (int i = 0; i < shouldTuple.getArity(); i++) {
+                       Object should = shouldTuple.getField(i);
+                       Object is = isTuple.getField(i);
+                       
+                       if (should.getClass().isArray()) {
+                               if (should instanceof boolean[]) {
+                                       Assert.assertTrue(message, 
Arrays.equals((boolean[]) should, (boolean[]) is));
+                               }
+                               else if (should instanceof byte[]) {
+                                       assertArrayEquals(message, (byte[]) 
should, (byte[]) is);
+                               }
+                               else if (should instanceof short[]) {
+                                       assertArrayEquals(message, (short[]) 
should, (short[]) is);
+                               }
+                               else if (should instanceof int[]) {
+                                       assertArrayEquals(message, (int[]) 
should, (int[]) is);
+                               }
+                               else if (should instanceof long[]) {
+                                       assertArrayEquals(message, (long[]) 
should, (long[]) is);
+                               }
+                               else if (should instanceof float[]) {
+                                       assertArrayEquals(message, (float[]) 
should, (float[]) is, 0.0f);
+                               }
+                               else if (should instanceof double[]) {
+                                       assertArrayEquals(message, (double[]) 
should, (double[]) is, 0.0);
+                               }
+                               else if (should instanceof char[]) {
+                                       assertArrayEquals(message, (char[]) 
should, (char[]) is);
+                               }
+                               else {
+                                       assertArrayEquals(message, (Object[]) 
should, (Object[]) is);
+                               }
+                       }
+                       else {
+                               assertEquals(message,  should, is);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
new file mode 100644
index 0000000..cf9874d
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+public class ValueComparatorTest extends ComparatorTestBase<StringValue> {
+
+       StringValue[] data = new StringValue[]{
+               new StringValue(""),
+               new StringValue("Lorem Ipsum Dolor Omit Longer"),
+               new StringValue("aaaa"),
+               new StringValue("abcd"),
+               new StringValue("abce"),
+               new StringValue("abdd"),
+               new StringValue("accd"),
+               new StringValue("bbcd")
+       };
+
+       @Override
+       protected TypeComparator<StringValue> createComparator(boolean 
ascending) {
+               return new ValueComparator<StringValue>(ascending, 
StringValue.class);
+       }
+
+       @Override
+       protected TypeSerializer<StringValue> createSerializer() {
+               return new ValueSerializer<StringValue>(StringValue.class);
+       }
+
+       @Override
+       protected StringValue[] getSortedTestData() {
+               return data;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
new file mode 100644
index 0000000..a2c3e1e
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class ValueComparatorUUIDTest extends ComparatorTestBase<ValueID> {
+       @Override
+       protected TypeComparator<ValueID> createComparator(boolean ascending) {
+               return new ValueComparator<>(ascending, ValueID.class);
+       }
+
+       @Override
+       protected TypeSerializer<ValueID> createSerializer() {
+               return new ValueSerializer<>(ValueID.class);
+       }
+
+       @Override
+       protected ValueID[] getSortedTestData() {
+               return new ValueID[] {
+                       new ValueID(new UUID(0, 0)),
+                       new ValueID(new UUID(1, 0)),
+                       new ValueID(new UUID(1, 1))
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
new file mode 100644
index 0000000..d644485
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class ValueID implements Value, Comparable<ValueID> {
+       private static final long serialVersionUID = -562791433077971752L;
+
+       private UUID id;
+
+       public ValueID() {
+               id = UUID.randomUUID();
+       }
+
+       public ValueID(UUID id) {
+               this.id = id;
+       }
+
+       @Override
+       public int compareTo(ValueID o) {
+               return id.compareTo(o.id);
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeLong(id.getMostSignificantBits());
+               out.writeLong(id.getLeastSignificantBits());
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               id = new UUID(in.readLong(), in.readLong());
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ValueID) {
+                       ValueID other = (ValueID) obj;
+
+                       return id.equals(other.id);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return id.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
new file mode 100644
index 0000000..9c07a5e
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class ValueSerializerUUIDTest extends SerializerTestBase<ValueID> {
+       @Override
+       protected TypeSerializer<ValueID> createSerializer() {
+               return new ValueSerializer<>(ValueID.class);
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<ValueID> getTypeClass() {
+               return ValueID.class;
+       }
+
+       @Override
+       protected ValueID[] getTestData() {
+               return new ValueID[] {
+                       new ValueID(new UUID(0, 0)),
+                       new ValueID(new UUID(1, 0)),
+                       new ValueID(new UUID(1, 1))
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
new file mode 100644
index 0000000..f5a90b7
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class WritableComparatorTest extends 
ComparatorTestBase<StringArrayWritable> {  
+       
+       StringArrayWritable[] data = new StringArrayWritable[]{
+                       new StringArrayWritable(new String[]{}),
+                       new StringArrayWritable(new String[]{""}),
+                       new StringArrayWritable(new String[]{"a","a"}),
+                       new StringArrayWritable(new String[]{"a","b"}),
+                       new StringArrayWritable(new String[]{"c","c"}),
+                       new StringArrayWritable(new String[]{"d","f"}),
+                       new StringArrayWritable(new String[]{"d","m"}),
+                       new StringArrayWritable(new String[]{"z","x"}),
+                       new StringArrayWritable(new String[]{"a","a", "a"})
+       };
+       
+       @Override
+       protected TypeComparator<StringArrayWritable> createComparator(boolean 
ascending) {
+               return new WritableComparator<StringArrayWritable>(ascending, 
StringArrayWritable.class);
+       }
+       
+       @Override
+       protected TypeSerializer<StringArrayWritable> createSerializer() {
+               return new 
WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
+       }
+       
+       @Override
+       protected StringArrayWritable[] getSortedTestData() {
+               return data;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
new file mode 100644
index 0000000..94e759d
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> 
{
+       @Override
+       protected TypeComparator<WritableID> createComparator(boolean 
ascending) {
+               return new WritableComparator<>(ascending, WritableID.class);
+       }
+
+       @Override
+       protected TypeSerializer<WritableID> createSerializer() {
+               return new WritableSerializer<>(WritableID.class);
+       }
+
+       @Override
+       protected WritableID[] getSortedTestData() {
+               return new WritableID[] {
+                       new WritableID(new UUID(0, 0)),
+                       new WritableID(new UUID(1, 0)),
+                       new WritableID(new UUID(1, 1))
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
new file mode 100644
index 0000000..4274cf6
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+public class WritableID implements WritableComparable<WritableID> {
+       private UUID uuid;
+
+       public WritableID() {
+               this.uuid = UUID.randomUUID();
+       }
+
+       public WritableID(UUID uuid) {
+               this.uuid = uuid;
+       }
+
+       @Override
+       public int compareTo(WritableID o) {
+               return this.uuid.compareTo(o.uuid);
+       }
+
+       @Override
+       public void write(DataOutput dataOutput) throws IOException {
+               dataOutput.writeLong(uuid.getMostSignificantBits());
+               dataOutput.writeLong(uuid.getLeastSignificantBits());
+       }
+
+       @Override
+       public void readFields(DataInput dataInput) throws IOException {
+               this.uuid = new UUID(dataInput.readLong(), 
dataInput.readLong());
+       }
+
+       @Override
+       public String toString() {
+               return uuid.toString();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               WritableID id = (WritableID) o;
+
+               return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != 
null);
+       }
+
+       @Override
+       public int hashCode() {
+               return uuid != null ? uuid.hashCode() : 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
new file mode 100644
index 0000000..bb5f4d4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.junit.Test;
+
+public class WritableSerializerTest {
+       
+       @Test
+       public void testStringArrayWritable() {
+               StringArrayWritable[] data = new StringArrayWritable[]{
+                               new StringArrayWritable(new String[]{}),
+                               new StringArrayWritable(new String[]{""}),
+                               new StringArrayWritable(new String[]{"a","a"}),
+                               new StringArrayWritable(new String[]{"a","b"}),
+                               new StringArrayWritable(new String[]{"c","c"}),
+                               new StringArrayWritable(new String[]{"d","f"}),
+                               new StringArrayWritable(new String[]{"d","m"}),
+                               new StringArrayWritable(new String[]{"z","x"}),
+                               new StringArrayWritable(new String[]{"a","a", 
"a"})
+               };
+               
+               WritableTypeInfo<StringArrayWritable> writableTypeInfo = 
(WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
+               WritableSerializer<StringArrayWritable> writableSerializer = 
(WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new 
ExecutionConfig());
+               
+               SerializerTestInstance<StringArrayWritable> testInstance = new 
SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(),
 -1, data);
+               
+               testInstance.testAll();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
new file mode 100644
index 0000000..2af7730
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> 
{
+       @Override
+       protected TypeSerializer<WritableID> createSerializer() {
+               return new WritableSerializer<>(WritableID.class);
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<WritableID> getTypeClass() {
+               return WritableID.class;
+       }
+
+       @Override
+       protected WritableID[] getTestData() {
+               return new WritableID[] {
+                       new WritableID(new UUID(0, 0)),
+                       new WritableID(new UUID(1, 0)),
+                       new WritableID(new UUID(1, 1))
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
new file mode 100644
index 0000000..7572408
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class KryoClearedBufferTest {
+
+       /**
+        * Tests that the kryo output buffer is cleared in case of an 
exception. Flink uses the
+        * EOFException to signal that a buffer is full. In such a case, the 
record which was tried
+        * to be written will be rewritten. Therefore, eventually buffered data 
of this record has
+        * to be cleared.
+        */
+       @Test
+       public void testOutputBufferedBeingClearedInCaseOfException() throws 
Exception {
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               
executionConfig.registerTypeWithKryoSerializer(TestRecord.class, new 
TestRecordSerializer());
+               executionConfig.registerKryoType(TestRecord.class);
+
+               KryoSerializer<TestRecord> kryoSerializer = new 
KryoSerializer<TestRecord>(
+                       TestRecord.class,
+                       executionConfig);
+
+               int size = 94;
+               int bufferSize = 150;
+
+               TestRecord testRecord = new TestRecord(size);
+
+               TestDataOutputView target = new TestDataOutputView(bufferSize);
+
+               kryoSerializer.serialize(testRecord, target);
+
+               try {
+                       kryoSerializer.serialize(testRecord, target);
+                       Assert.fail("Expected an EOFException.");
+               } catch(EOFException eofException) {
+                       // expected exception
+                       // now the Kryo Output should have been cleared
+               }
+
+               TestRecord actualRecord = kryoSerializer.deserialize(
+                               new DataInputViewStreamWrapper(new 
ByteArrayInputStream(target.getBuffer())));
+
+               Assert.assertEquals(testRecord, actualRecord);
+
+               target.clear();
+
+               // if the kryo output has been cleared then we can serialize 
our test record into the target
+               // because the target buffer 150 bytes can host one TestRecord 
(total serialization size 100)
+               kryoSerializer.serialize(testRecord, target);
+
+               byte[] buffer = target.getBuffer();
+               int counter = 0;
+
+               for (int i = 0; i < buffer.length; i++) {
+                       if(buffer[i] == 42) {
+                               counter++;
+                       }
+               }
+
+               Assert.assertEquals(size, counter);
+       }
+
+       public static class TestRecord {
+               private byte[] buffer;
+
+               public TestRecord(int size) {
+                       buffer = new byte[size];
+
+                       Arrays.fill(buffer, (byte)42);
+               }
+
+               public TestRecord(byte[] buffer){
+                       this.buffer = buffer;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof TestRecord) {
+                               TestRecord record = (TestRecord) obj;
+
+                               return Arrays.equals(buffer, record.buffer);
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+       public static class TestRecordSerializer extends Serializer<TestRecord> 
implements Serializable {
+
+               private static final long serialVersionUID = 
6971996565421454985L;
+
+               @Override
+               public void write(Kryo kryo, Output output, TestRecord object) {
+                       output.writeInt(object.buffer.length);
+                       output.write(object.buffer);
+               }
+
+               @Override
+               public TestRecord read(Kryo kryo, Input input, 
Class<TestRecord> type) {
+                       int length = input.readInt();
+                       byte[] buffer = input.readBytes(length);
+
+                       return new TestRecord(buffer);
+               }
+       }
+
+       public static class TestDataOutputView implements DataOutputView {
+
+               private byte[] buffer;
+               private int position;
+
+               public TestDataOutputView(int size) {
+                       buffer = new byte[size];
+                       position = 0;
+               }
+
+               public void clear() {
+                       position = 0;
+               }
+
+               public byte[] getBuffer() {
+                       return buffer;
+               }
+
+               public void checkSize(int numBytes) throws EOFException {
+                       if (position + numBytes > buffer.length) {
+                               throw new EOFException();
+                       }
+               }
+
+               @Override
+               public void skipBytesToWrite(int numBytes) throws IOException {
+                       checkSize(numBytes);
+
+                       position += numBytes;
+               }
+
+               @Override
+               public void write(DataInputView source, int numBytes) throws 
IOException {
+                       checkSize(numBytes);
+
+                       byte[] tempBuffer = new byte[numBytes];
+
+                       source.read(tempBuffer);
+
+                       System.arraycopy(tempBuffer, 0, buffer, position, 
numBytes);
+
+                       position += numBytes;
+               }
+
+               @Override
+               public void write(int b) throws IOException {
+                       checkSize(4);
+
+                       position += 4;
+               }
+
+               @Override
+               public void write(byte[] b) throws IOException {
+                       checkSize(b.length);
+
+                       System.arraycopy(b, 0, buffer, position, b.length);
+                       position += b.length;
+               }
+
+               @Override
+               public void write(byte[] b, int off, int len) throws 
IOException {
+                       checkSize(len);
+
+                       System.arraycopy(b, off, buffer, position, len);
+
+                       position += len;
+               }
+
+               @Override
+               public void writeBoolean(boolean v) throws IOException {
+                       checkSize(1);
+                       position += 1;
+               }
+
+               @Override
+               public void writeByte(int v) throws IOException {
+                       checkSize(1);
+
+                       buffer[position] = (byte)v;
+
+                       position++;
+               }
+
+               @Override
+               public void writeShort(int v) throws IOException {
+                       checkSize(2);
+
+                       position += 2;
+               }
+
+               @Override
+               public void writeChar(int v) throws IOException {
+                       checkSize(1);
+                       position++;
+               }
+
+               @Override
+               public void writeInt(int v) throws IOException {
+                       checkSize(4);
+
+                       position += 4;
+               }
+
+               @Override
+               public void writeLong(long v) throws IOException {
+                       checkSize(8);
+                       position += 8;
+               }
+
+               @Override
+               public void writeFloat(float v) throws IOException {
+                       checkSize(4);
+                       position += 4;
+               }
+
+               @Override
+               public void writeDouble(double v) throws IOException {
+                       checkSize(8);
+                       position += 8;
+               }
+
+               @Override
+               public void writeBytes(String s) throws IOException {
+                       byte[] sBuffer = s.getBytes();
+                       checkSize(sBuffer.length);
+                       System.arraycopy(sBuffer, 0, buffer, position, 
sBuffer.length);
+                       position += sBuffer.length;
+               }
+
+               @Override
+               public void writeChars(String s) throws IOException {
+                       byte[] sBuffer = s.getBytes();
+                       checkSize(sBuffer.length);
+                       System.arraycopy(sBuffer, 0, buffer, position, 
sBuffer.length);
+                       position += sBuffer.length;
+               }
+
+               @Override
+               public void writeUTF(String s) throws IOException {
+                       byte[] sBuffer = s.getBytes();
+                       checkSize(sBuffer.length);
+                       System.arraycopy(sBuffer, 0, buffer, position, 
sBuffer.length);
+                       position += sBuffer.length;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
new file mode 100644
index 0000000..9d3eef1
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericArraySerializerTest;
+
+public class KryoGenericArraySerializerTest extends 
AbstractGenericArraySerializerTest {
+       @Override
+       protected <T> TypeSerializer<T> createComponentSerializer(Class<T> 
type) {
+               return new KryoSerializer<T>(type, new ExecutionConfig());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
new file mode 100644
index 0000000..19aed78
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeComparatorTest;
+
+public class KryoGenericTypeComparatorTest extends 
AbstractGenericTypeComparatorTest {
+       @Override
+       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+               return new KryoSerializer<T>(type, new ExecutionConfig());
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
new file mode 100644
index 0000000..8ff0b1b
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
+import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("unchecked")
+public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
+
+       ExecutionConfig ec = new ExecutionConfig();
+       
+       @Test
+       public void testJavaList(){
+               Collection<Integer> a = new ArrayList<>();
+
+               fillCollection(a);
+
+               runTests(a);
+       }
+
+       @Test
+       public void testJavaSet(){
+               Collection<Integer> b = new HashSet<>();
+
+               fillCollection(b);
+
+               runTests(b);
+       }
+
+
+
+       @Test
+       public void testJavaDequeue(){
+               Collection<Integer> c = new LinkedList<>();
+               fillCollection(c);
+               runTests(c);
+       }
+
+       private void fillCollection(Collection<Integer> coll) {
+               coll.add(42);
+               coll.add(1337);
+               coll.add(49);
+               coll.add(1);
+       }
+
+       @Override
+       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+               return new KryoSerializer<T>(type, ec);
+       }
+       
+       /**
+        * Make sure that the kryo serializer forwards EOF exceptions properly 
when serializing
+        */
+       @Test
+       public void testForwardEOFExceptionWhileSerializing() {
+               try {
+                       // construct a long string
+                       String str;
+                       {
+                               char[] charData = new char[40000];
+                               Random rnd = new Random();
+                               
+                               for (int i = 0; i < charData.length; i++) {
+                                       charData[i] = (char) rnd.nextInt(10000);
+                               }
+                               
+                               str = new String(charData);
+                       }
+                       
+                       // construct a memory target that is too small for the 
string
+                       TestDataOutputSerializer target = new 
TestDataOutputSerializer(10000, 30000);
+                       KryoSerializer<String> serializer = new 
KryoSerializer<String>(String.class, new ExecutionConfig());
+                       
+                       try {
+                               serializer.serialize(str, target);
+                               fail("should throw a java.io.EOFException");
+                       }
+                       catch (java.io.EOFException e) {
+                               // that is how we like it
+                       }
+                       catch (Exception e) {
+                               fail("throws wrong exception: should throw a 
java.io.EOFException, has thrown a " + e.getClass().getName());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       /**
+        * Make sure that the kryo serializer forwards EOF exceptions properly 
when serializing
+        */
+       @Test
+       public void testForwardEOFExceptionWhileDeserializing() {
+               try {
+                       int numElements = 100;
+                       // construct a memory target that is too small for the 
string
+                       TestDataOutputSerializer target = new 
TestDataOutputSerializer(5*numElements, 5*numElements);
+                       KryoSerializer<Integer> serializer = new 
KryoSerializer<>(Integer.class, new ExecutionConfig());
+
+                       for(int i = 0; i < numElements; i++){
+                               serializer.serialize(i, target);
+                       }
+
+                       ComparatorTestBase.TestInputView source = new 
ComparatorTestBase.TestInputView(target.copyByteBuffer());
+
+                       for(int i = 0; i < numElements; i++){
+                               int value = serializer.deserialize(source);
+                               assertEquals(i, value);
+                       }
+
+                       try {
+                               serializer.deserialize(source);
+                               fail("should throw a java.io.EOFException");
+                       }
+                       catch (java.io.EOFException e) {
+                               // that is how we like it :-)
+                       }
+                       catch (Exception e) {
+                               fail("throws wrong exception: should throw a 
java.io.EOFException, has thrown a " + e.getClass().getName());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void validateReferenceMappingDisabled() {
+               KryoSerializer<String> serializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
+               Kryo kryo = serializer.getKryo();
+               assertFalse(kryo.getReferences());
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
new file mode 100644
index 0000000..d68afd6
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
+import org.joda.time.LocalDate;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+@SuppressWarnings("unchecked")
+public class KryoWithCustomSerializersTest extends 
AbstractGenericTypeSerializerTest {
+       
+
+       @Test
+       public void testJodaTime(){
+               Collection<LocalDate> b = new HashSet<LocalDate>();
+
+               b.add(new LocalDate(1L));
+               b.add(new LocalDate(2L));
+
+               runTests(b);
+       }
+
+       @Override
+       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+               ExecutionConfig conf = new ExecutionConfig();
+               conf.registerTypeWithKryoSerializer(LocalDate.class, 
LocalDateSerializer.class);
+               TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type);
+               return typeInfo.createSerializer(conf);
+       }
+       
+       public static final class LocalDateSerializer extends 
Serializer<LocalDate> implements java.io.Serializable {
+               
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void write(Kryo kryo, Output output, LocalDate object) {
+                       output.writeInt(object.getYear());
+                       output.writeInt(object.getMonthOfYear());
+                       output.writeInt(object.getDayOfMonth());
+               }
+               
+               @Override
+               public LocalDate read(Kryo kryo, Input input, Class<LocalDate> 
type) {
+                       return new LocalDate(input.readInt(), input.readInt(), 
input.readInt());
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
new file mode 100644
index 0000000..7c6d023
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertTrue;
+
+public class SerializersTest {
+
+       // recursive
+       public static class Node {
+               private Node parent;
+       }
+       
+       public static class FromNested {
+               Node recurseMe;
+       }
+       
+       public static class FromGeneric1 {}
+       public static class FromGeneric2 {}
+       
+       public static class Nested1 {
+               private FromNested fromNested;
+               private Path yodaIntervall;
+       }
+
+       public static class ClassWithNested {
+               
+               Nested1 nested;
+               int ab;
+               
+               ArrayList<FromGeneric1> addGenType;
+               FromGeneric2[] genericArrayType;
+       }
+
+       @Test
+       public void testTypeRegistration() {
+               ExecutionConfig conf = new ExecutionConfig();
+               Serializers.recursivelyRegisterType(ClassWithNested.class, 
conf, new HashSet<Class<?>>());
+               
+               KryoSerializer<String> kryo = new 
KryoSerializer<>(String.class, conf); // we create Kryo from another type.
+
+               
Assert.assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
+               
Assert.assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() 
> 0);
+               
Assert.assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0);
+               
+               // check if the generic type from one field is also registered 
(its very likely that
+               // generic types are also used as fields somewhere.
+               
Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 
0);
+               
Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 
0);
+               
Assert.assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0);
+               
+               
+               // register again and make sure classes are still registered
+               ExecutionConfig conf2 = new ExecutionConfig();
+               Serializers.recursivelyRegisterType(ClassWithNested.class, 
conf2, new HashSet<Class<?>>());
+               KryoSerializer<String> kryo2 = new 
KryoSerializer<>(String.class, conf);
+               
assertTrue(kryo2.getKryo().getRegistration(FromNested.class).getId() > 0);
+       }
+
+       @Test
+       public void testTypeRegistrationFromTypeInfo() {
+               ExecutionConfig conf = new ExecutionConfig();
+               Serializers.recursivelyRegisterType(new 
GenericTypeInfo<>(ClassWithNested.class), conf, new HashSet<Class<?>>());
+
+               KryoSerializer<String> kryo = new 
KryoSerializer<>(String.class, conf); // we create Kryo from another type.
+
+               
assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
+               
assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0);
+               assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 
0);
+
+               // check if the generic type from one field is also registered 
(its very likely that
+               // generic types are also used as fields somewhere.
+               
assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0);
+               
assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0);
+               assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 
0);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
new file mode 100644
index 0000000..faab26a
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.tuple.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class TupleComparatorTestBase<T extends Tuple> extends 
ComparatorTestBase<T> {
+
+       @Override
+       protected void deepEquals(String message, T should, T is) {
+               for (int x = 0; x < should.getArity(); x++) {
+                       assertEquals(should.getField(x), is.getField(x));
+               }
+       }
+
+       @Override
+       protected abstract TupleComparator<T> createComparator(boolean 
ascending);
+
+       @Override
+       protected abstract TupleSerializer<T> createSerializer();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
new file mode 100644
index 0000000..1d414d8
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.tuple.base;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+/**
+ * Abstract test base for TuplePairComparators.
+ *
+ * @param <T>
+ * @param <R>
+ */
+public abstract class TuplePairComparatorTestBase<T extends Tuple, R extends 
Tuple> extends TestLogger {
+
+       protected abstract TypePairComparator<T, R> createComparator(boolean 
ascending);
+
+       protected abstract Tuple2<T[], R[]> getSortedTestData();
+
+       @Test
+       public void testEqualityWithReference() {
+               try {
+                       TypePairComparator<T, R> comparator = 
getComparator(true);
+                       Tuple2<T[], R[]> data = getSortedData();
+                       for (int x = 0; x < data.f0.length; x++) {
+                               comparator.setReference(data.f0[x]);
+
+                               
assertTrue(comparator.equalToReference(data.f1[x]));
+                       }
+               } catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Exception in test: " + e.getMessage());
+               }
+       }
+
+       @Test
+       public void testInequalityWithReference() {
+               testGreatSmallAscDescWithReference(true);
+               testGreatSmallAscDescWithReference(false);
+       }
+
+       protected void testGreatSmallAscDescWithReference(boolean ascending) {
+               try {
+                       Tuple2<T[], R[]> data = getSortedData();
+
+                       TypePairComparator<T, R> comparator = 
getComparator(ascending);
+
+                       //compares every element in high with every element in 
low
+                       for (int x = 0; x < data.f0.length - 1; x++) {
+                               for (int y = x + 1; y < data.f1.length; y++) {
+                                       comparator.setReference(data.f0[x]);
+                                       if (ascending) {
+                                               
assertTrue(comparator.compareToReference(data.f1[y]) > 0);
+                                       } else {
+                                               
assertTrue(comparator.compareToReference(data.f1[y]) < 0);
+                                       }
+                               }
+                       }
+               } catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Exception in test: " + e.getMessage());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       protected TypePairComparator<T, R> getComparator(boolean ascending) {
+               TypePairComparator<T, R> comparator = 
createComparator(ascending);
+               if (comparator == null) {
+                       throw new RuntimeException("Test case corrupt. Returns 
null as comparator.");
+               }
+               return comparator;
+       }
+
+       protected Tuple2<T[], R[]> getSortedData() {
+               Tuple2<T[], R[]> data = getSortedTestData();
+               if (data == null || data.f0 == null || data.f1 == null) {
+                       throw new RuntimeException("Test case corrupt. Returns 
null as test data.");
+               }
+               if (data.f0.length < 2 || data.f1.length < 2) {
+                       throw new RuntimeException("Test case does not provide 
enough sorted test data.");
+               }
+
+               return data;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index a31e89d..3203d75 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -40,22 +40,12 @@ under the License.
                        <artifactId>flink-core</artifactId>
                        <version>${project.version}</version>
                </dependency>
+               
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>${shading-artifact.name}</artifactId>
                        <version>${project.version}</version>
                </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.avro</groupId>
-                       <artifactId>avro</artifactId>
-                       <!-- version is derived from base module -->
-               </dependency>
-               
-               <dependency>
-                       <groupId>com.esotericsoftware.kryo</groupId>
-                       <artifactId>kryo</artifactId>
-               </dependency>
 
                <dependency>
                        <groupId>org.ow2.asm</groupId>
@@ -64,12 +54,6 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>com.twitter</groupId>
-                       <artifactId>chill-java</artifactId>
-                       <version>${chill.version}</version>
-               </dependency>
-
-               <dependency>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                        <version>${guava.version}</version>
@@ -88,20 +72,6 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
-
-               <dependency>
-                       <groupId>joda-time</groupId>
-                       <artifactId>joda-time</artifactId>
-                       <version>2.5</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.joda</groupId>
-                       <artifactId>joda-convert</artifactId>
-                       <version>1.7</version>
-                       <scope>test</scope>
-               </dependency>
                
        </dependencies>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 6db32c5..6bcdb52 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -63,7 +63,7 @@ import 
org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.PartitionOperator;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 2edc533..80f8199 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -33,7 +33,6 @@ import org.apache.flink.configuration.Configuration;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.util.List;
 import java.util.Random;
 
 import static 
org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
@@ -61,24 +60,6 @@ public final class Utils {
                return String.format("%s(%s:%d)", elem.getMethodName(), 
elem.getFileName(), elem.getLineNumber());
        }
 
-       /**
-        * Returns all GenericTypeInfos contained in a composite type.
-        *
-        * @param typeInfo {@link CompositeType}
-        */
-       public static void getContainedGenericTypes(CompositeType<?> typeInfo, 
List<GenericTypeInfo<?>> target) {
-               for (int i = 0; i < typeInfo.getArity(); i++) {
-                       TypeInformation<?> type = typeInfo.getTypeAt(i);
-                       if (type instanceof CompositeType) {
-                               getContainedGenericTypes((CompositeType<?>) 
type, target);
-                       } else if (type instanceof GenericTypeInfo) {
-                               if (!target.contains(type)) {
-                                       target.add((GenericTypeInfo<?>) type);
-                               }
-                       }
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
deleted file mode 100644
index 3d06c59..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.functions.Function;
-
-import java.io.Serializable;
-
-/**
- * The {@link KeySelector} allows to use arbitrary objects for operations such 
as
- * reduce, reduceGroup, join, coGoup, etc.
- * 
- * The extractor takes an object and returns the key for that object.
- *
- * @param <IN> Type of objects to extract the key from.
- * @param <KEY> Type of key.
- */
-public interface KeySelector<IN, KEY> extends Function, Serializable {
-       
-       /**
-        * User-defined function that extracts the key from an arbitrary object.
-        * 
-        * For example for a class:
-        * <pre>
-        *      public class Word {
-        *              String word;
-        *              int count;
-        *      }
-        * </pre>
-        * The key extractor could return the word as
-        * a key to group all Word objects by the String they contain.
-        * 
-        * The code would look like this
-        * <pre>
-        *      public String getKey(Word w) {
-        *              return w.word;
-        *      }
-        * </pre>
-        * 
-        * @param value The object to get the key from.
-        * @return The extracted key.
-        * 
-        * @throws Exception Throwing an exception will cause the execution of 
the respective task to fail,
-        *                   and trigger recovery or cancellation of the 
program. 
-        */
-       KEY getKey(IN value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 761eeb3..a75b8e0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 import java.lang.annotation.Annotation;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
index c44929f..6763cdf 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 
 import java.util.Arrays;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 00e0d3b..6485936 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 845deb4..6c6b051 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -40,9 +41,9 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import 
org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import 
org.apache.flink.api.java.operators.translation.PlanBothUnwrappingCoGroupOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingCoGroupOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanRightUnwrappingCoGroupOperator;
@@ -297,11 +298,11 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                @SuppressWarnings("unchecked")
                final SelectorFunctionKeys<I2, K> keys2 = 
(SelectorFunctionKeys<I2, K>) rawKeys2;
 
-               final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = 
SelectorFunctionKeys.createTypeWithKey(keys1);
-               final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = 
SelectorFunctionKeys.createTypeWithKey(keys2);
+               final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = 
KeyFunctions.createTypeWithKey(keys1);
+               final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = 
KeyFunctions.createTypeWithKey(keys2);
 
-               final Operator<Tuple2<K, I1>> keyedInput1 = 
SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
-               final Operator<Tuple2<K, I2>> keyedInput2 = 
SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
+               final Operator<Tuple2<K, I1>> keyedInput1 = 
KeyFunctions.appendKeyExtractor(input1, keys1);
+               final Operator<Tuple2<K, I2>> keyedInput2 = 
KeyFunctions.appendKeyExtractor(input2, keys2);
 
                final PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup 
=
                        new PlanBothUnwrappingCoGroupOperator<>(function, 
keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
@@ -324,8 +325,8 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
 
                @SuppressWarnings("unchecked")
                final SelectorFunctionKeys<I2, K> keys2 = 
(SelectorFunctionKeys<I2, K>) rawKeys2;
-               final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = 
SelectorFunctionKeys.createTypeWithKey(keys2);
-               final Operator<Tuple2<K, I2>> keyedInput2 = 
SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
+               final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = 
KeyFunctions.createTypeWithKey(keys2);
+               final Operator<Tuple2<K, I2>> keyedInput2 = 
KeyFunctions.appendKeyExtractor(input2, keys2);
                
                final PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> 
cogroup =
                                new PlanRightUnwrappingCoGroupOperator<>(
@@ -355,8 +356,8 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
 
                @SuppressWarnings("unchecked")
                final SelectorFunctionKeys<I1, K> keys1 = 
(SelectorFunctionKeys<I1, K>) rawKeys1;
-               final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = 
SelectorFunctionKeys.createTypeWithKey(keys1);
-               final Operator<Tuple2<K, I1>> keyedInput1 = 
SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
+               final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = 
KeyFunctions.createTypeWithKey(keys1);
+               final Operator<Tuple2<K, I1>> keyedInput1 = 
KeyFunctions.appendKeyExtractor(input1, keys1);
 
                final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup 
=
                                new PlanLeftUnwrappingCoGroupOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
index 30639c3..74f54b8 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
@@ -21,11 +21,12 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
 
 /**
  * A {@link DataSet} that is the result of a CoGroup transformation. 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 915a053..8745271 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index a302478..7b3001f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
index 6717c6d..4af9108 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 5102c80..9979f59 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -20,12 +20,13 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
@@ -116,8 +117,8 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                @SuppressWarnings("unchecked")
                final SelectorFunctionKeys<IN, K> keys = 
(SelectorFunctionKeys<IN, K>) rawKeys;
                
-               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = 
SelectorFunctionKeys.createTypeWithKey(keys);
-               Operator<Tuple2<K, IN>> keyedInput = 
SelectorFunctionKeys.appendKeyExtractor(input, keys);
+               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = 
KeyFunctions.createTypeWithKey(keys);
+               Operator<Tuple2<K, IN>> keyedInput = 
KeyFunctions.appendKeyExtractor(input, keys);
                
                PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
                                new 
PlanUnwrappingReduceGroupOperator<>(function, keys, name, outputType, 
typeInfoWithKey, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index ef0c12f..a43b869 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -30,7 +31,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 
@@ -195,8 +196,8 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
        {
                final SelectorFunctionKeys<IN, K> keys = 
(SelectorFunctionKeys<IN, K>) rawKeys;
 
-               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = 
SelectorFunctionKeys.createTypeWithKey(keys);
-               Operator<Tuple2<K, IN>> keyedInput = 
SelectorFunctionKeys.appendKeyExtractor(input, keys);
+               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = 
KeyFunctions.createTypeWithKey(keys);
+               Operator<Tuple2<K, IN>> keyedInput = 
KeyFunctions.appendKeyExtractor(input, keys);
 
                PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer =
                        new PlanUnwrappingGroupCombineOperator<>(function, 
keys, name, outputType, typeInfoWithKey);
@@ -217,9 +218,9 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
        {
                final SelectorFunctionKeys<IN, K1> groupingKey = 
(SelectorFunctionKeys<IN, K1>) rawGroupingKey;
                final SelectorFunctionKeys<IN, K2> sortingKey = 
(SelectorFunctionKeys<IN, K2>)rawSortingKeys;
-               TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = 
SelectorFunctionKeys.createTypeWithKey(groupingKey, sortingKey);
+               TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = 
KeyFunctions.createTypeWithKey(groupingKey, sortingKey);
 
-               Operator<Tuple3<K1, K2, IN>> inputWithKey = 
SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey);
+               Operator<Tuple3<K1, K2, IN>> inputWithKey = 
KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey);
 
                PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> 
reducer =
                        new 
PlanUnwrappingSortedGroupCombineOperator<>(function, groupingKey, sortingKey, 
name, outputType, typeInfoWithKey);

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index b1bf844..42553a0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -28,7 +29,7 @@ import 
org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -281,9 +282,9 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                        boolean combinable)
        {
                SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, 
K>) rawKeys;
-               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = 
SelectorFunctionKeys.createTypeWithKey(keys);
+               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = 
KeyFunctions.createTypeWithKey(keys);
 
-               Operator<Tuple2<K, IN>> keyedInput = 
SelectorFunctionKeys.appendKeyExtractor(input, keys);
+               Operator<Tuple2<K, IN>> keyedInput = 
KeyFunctions.appendKeyExtractor(input, keys);
 
                PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
                        new PlanUnwrappingReduceGroupOperator(function, keys, 
name, outputType, typeInfoWithKey, combinable);
@@ -305,9 +306,9 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
        {
                final SelectorFunctionKeys<IN, K1> groupingKey = 
(SelectorFunctionKeys<IN, K1>) rawGroupingKey;
                final SelectorFunctionKeys<IN, K2> sortingKey = 
(SelectorFunctionKeys<IN, K2>) rawSortingKey;
-               TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = 
SelectorFunctionKeys.createTypeWithKey(groupingKey,sortingKey);
+               TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = 
KeyFunctions.createTypeWithKey(groupingKey,sortingKey);
 
-               Operator<Tuple3<K1, K2, IN>> inputWithKey = 
SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey);
+               Operator<Tuple3<K1, K2, IN>> inputWithKey = 
KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey);
 
                PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> 
reducer =
                        new PlanUnwrappingSortedReduceGroupOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
index c117458..823aee4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.DataSet;
 
 /**

Reply via email to